FS2-Kafka in real world -Part 1
FS2 Kafka is a library which lets us connect Apache Kafka to FS2 streams. The library provides for creating streams of Kafka consumers and producers and is inspired from Alpakka-Kafka.
There are some important concepts to understand before we recognise patterns in FS2 Kafka. Firstly as FS2-Kafka is built on top of FS2 let’s look at some operation in FS2 streams that are used widely in FS2 Kafka.
Pipe operation
type Pipe[F[_], -I, +O] = Stream[F, I] => Stream[F, O]
Pipe is a transformation from one stream to another. Pipes are generally used with through
function.
def through[F2, O2](f: Pipe[F, O, O2]): Stream[F2, O2] = f(this)
// variance rules skipped for brevity
Here is an example of how using through
and pipe
can reduce code size and introduce composition.
Stream("Hello","FS2").through(text.utf8Encode)// converts a Stream[_,String] to Stream[_,Bytes]
The documentation of the library is good and is an excellent starting point. The goal of this program is to read data from a Kafka topic and publish the data back. Lets look at each line of the quick example
Lines 10–14 create ConsumerSettings[F, Key, Value]
. Before we can create a Kafka Consumer with record streaming we need these settings. There are 4 mandatory properties we need to create ConsumerSettings
. A effect type F
, Key
and Value
deserializers and the bootstrap servers for Kafka.
Deserializers for standard types like String
, Array[Byte]
, Long
, Int
etc are provided implicitly. We can also specify the deserializers explicitly when necessary.
ConsumerSettings(
keyDeserializer = Deserializer[IO, String],
valueDeserializer = Deserializer[IO, String]
).withBootstrapServers("localhost:9092")
Line 16–18 create ProducerSettings[F, Key, Value]
. These settings are required to create a Kafka producer. Similar to ConsumerSetting
the effect type, key/value serializers and bootstrap servers are mandatory.
Line 21 uses KafkaConsumer.stream(consumerSettings)
to create a KafkaConsumer
instance in the Stream
context. Remember that this is pure scala so we are doing everything in a context like Stream
or F
. The library provides Extension methods for operating on a KafkaConsumer
in a Stream
context without needing to explicitly use operations such as flatMap
and evalTap
. We use the subscribeTo
method to subscribe to a topic as done in line 22. The extension method records
an alias for stream
is used in Line 23 to create a stream of consumer records. Once we have a stream of records our business logic takes over. But before going further let’s look at what exactly is this stream of record.
KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records//returns a Stream[IO, CommittableConsumerRecord[IO,String,String]]
ComittableConsumerRecord
contains the Kafka record
and an offset
of type ComittableOffset
. The offset
contains client data like topic, partition and offset. This information is helpful when we want to use the commit offsets in the downstream.
Line 24 uses the mapAsync
from fs2 to evaluate elements of this stream (Kafka records) in parallel with an upper limit of 25 records parallely. Our goal is to push each record back to Kafka so we basically create a new ProducerRecord
from each consumed record in Lines 26–30. At line 30 we have basically created a stream of ProducerRecords[String, String, CommittableOffset[IO]]
. We have taken each record that we read via the consumer , converted that record into a producer record and used the offset
as the passthrough
value in the producer record.
Line 31 tranforms a Stream[IO,ProducerRecord[..]]
to a Stream[IO,ProducerResult[..]]
. Lets look at this transformation in more detail
KafkaProducer.pipe(producerSettings)
// creates a Pipe[IO, ProducerRecords[String, String, CommitableOffset], ProducerResult[String, String, CommitableOffset]]
Basically each producer record is passed to the KafkaProducer and pushed to Kafka and a Producer result comes out. This seems magical but thats the power of lambda expressions and Scala conciseness.
Line 32 extracts the CommitableOffset
from the ProducerResult
and then we use another transformation. A Stream[IO,CommitableOffset]
is transformed to Stream[IO,Unit]
. But wait. What happened to out CommitableOffset
?
commitBatchWithin(500, 15.seconds)// creates a Pipe[F, CommittableOffset[F], Unit]
// Commits offsets in batches of every n offsets or time window of // length d, whichever happens firs
So, lines 21–33 are series of operations on streams. We start reading records from a Kafka topic and transform them into records that can be pushed back to Kafka. We push them to Kafka and as we do that we commit the offsets that are read. Of course stream
is only a data structure that declares our motives. In line 35 we compile the Stream into value of IO[Unit]
and the IOApp
takes care of running our stream.
Hopefully after reading all this, you understand how to use FS2 Kafka to consume and produce data from Kafka using FS2 Streams. In the next part of this series we will check how we can use all this inside a non IOApp
. More specifically how to use FS2 Kafka producer when we do not have a stream of input data and when the data comes non deterministically.
Further readings
- Apache Kafka — https://kafka.apache.org/
- FS2 — https://github.com/typelevel/fs2
- DATA TYPES AND SERIALIZATION — https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes
- Kafka consumer offset management — https://docs.confluent.io/platform/current/clients/consumer.html#offset-management