FS2-Kafka in real world -Part 1

ayush mittal
4 min readJan 30, 2022
Photo by Sigmund on Unsplash

FS2 Kafka is a library which lets us connect Apache Kafka to FS2 streams. The library provides for creating streams of Kafka consumers and producers and is inspired from Alpakka-Kafka.

There are some important concepts to understand before we recognise patterns in FS2 Kafka. Firstly as FS2-Kafka is built on top of FS2 let’s look at some operation in FS2 streams that are used widely in FS2 Kafka.

Pipe operation

type Pipe[F[_], -I, +O] = Stream[F, I] => Stream[F, O]

Pipe is a transformation from one stream to another. Pipes are generally used with through function.

def through[F2, O2](f: Pipe[F, O, O2]): Stream[F2, O2] = f(this)
// variance rules skipped for brevity

Here is an example of how using through and pipe can reduce code size and introduce composition.

Stream("Hello","FS2").through(text.utf8Encode)// converts a Stream[_,String] to Stream[_,Bytes]

The documentation of the library is good and is an excellent starting point. The goal of this program is to read data from a Kafka topic and publish the data back. Lets look at each line of the quick example

Lines 10–14 create ConsumerSettings[F, Key, Value]. Before we can create a Kafka Consumer with record streaming we need these settings. There are 4 mandatory properties we need to create ConsumerSettings. A effect type F , Keyand Valuedeserializers and the bootstrap servers for Kafka.

Deserializers for standard types like String, Array[Byte] , Long, Int etc are provided implicitly. We can also specify the deserializers explicitly when necessary.

ConsumerSettings(
keyDeserializer = Deserializer[IO, String],
valueDeserializer = Deserializer[IO, String]
).withBootstrapServers("localhost:9092")

Line 16–18 create ProducerSettings[F, Key, Value] . These settings are required to create a Kafka producer. Similar to ConsumerSetting the effect type, key/value serializers and bootstrap servers are mandatory.

Line 21 uses KafkaConsumer.stream(consumerSettings) to create a KafkaConsumer instance in the Stream context. Remember that this is pure scala so we are doing everything in a context like Stream or F. The library provides Extension methods for operating on a KafkaConsumerin a Stream context without needing to explicitly use operations such as flatMap and evalTap . We use the subscribeTo method to subscribe to a topic as done in line 22. The extension method records an alias for stream is used in Line 23 to create a stream of consumer records. Once we have a stream of records our business logic takes over. But before going further let’s look at what exactly is this stream of record.

KafkaConsumer.stream(consumerSettings)
.subscribeTo("topic")
.records
//returns a Stream[IO, CommittableConsumerRecord[IO,String,String]]

ComittableConsumerRecord contains the Kafka record and an offset of type ComittableOffset. The offset contains client data like topic, partition and offset. This information is helpful when we want to use the commit offsets in the downstream.

Line 24 uses the mapAsync from fs2 to evaluate elements of this stream (Kafka records) in parallel with an upper limit of 25 records parallely. Our goal is to push each record back to Kafka so we basically create a new ProducerRecord from each consumed record in Lines 26–30. At line 30 we have basically created a stream of ProducerRecords[String, String, CommittableOffset[IO]]. We have taken each record that we read via the consumer , converted that record into a producer record and used the offset as the passthrough value in the producer record.

Line 31 tranforms a Stream[IO,ProducerRecord[..]] to a Stream[IO,ProducerResult[..]]. Lets look at this transformation in more detail

KafkaProducer.pipe(producerSettings)
// creates a Pipe[IO, ProducerRecords[String, String, CommitableOffset], ProducerResult[String, String, CommitableOffset]]

Basically each producer record is passed to the KafkaProducer and pushed to Kafka and a Producer result comes out. This seems magical but thats the power of lambda expressions and Scala conciseness.

Line 32 extracts the CommitableOffset from the ProducerResult and then we use another transformation. A Stream[IO,CommitableOffset] is transformed to Stream[IO,Unit]. But wait. What happened to out CommitableOffset?

commitBatchWithin(500, 15.seconds)// creates a Pipe[F, CommittableOffset[F], Unit]
// Commits offsets in batches of every n offsets or time window of // length d, whichever happens firs

So, lines 21–33 are series of operations on streams. We start reading records from a Kafka topic and transform them into records that can be pushed back to Kafka. We push them to Kafka and as we do that we commit the offsets that are read. Of course stream is only a data structure that declares our motives. In line 35 we compile the Stream into value of IO[Unit] and the IOApp takes care of running our stream.

Hopefully after reading all this, you understand how to use FS2 Kafka to consume and produce data from Kafka using FS2 Streams. In the next part of this series we will check how we can use all this inside a non IOApp . More specifically how to use FS2 Kafka producer when we do not have a stream of input data and when the data comes non deterministically.

Further readings

  1. Apache Kafka — https://kafka.apache.org/
  2. FS2 — https://github.com/typelevel/fs2
  3. DATA TYPES AND SERIALIZATION — https://kafka.apache.org/10/documentation/streams/developer-guide/datatypes
  4. Kafka consumer offset management — https://docs.confluent.io/platform/current/clients/consumer.html#offset-management

--

--