FS2-Kafka in real world -Part 2

ayush mittal
3 min readFeb 11, 2022
Photo by Erlend Ekseth on Unsplash

This is a continuation of series on how to use F2-Kafka in real world. Checkout part1 to understand what is F2-Kafka and how it can be used ideally.

FS2-Kafka documentation gives examples on how to use a KafkaProducer to produce data. All examples assume that there is an existing stream of data from where data will be pushed to Kafka.

This raises a question. What if we want to push data in Kafka when there is no source stream? Consider this example

A simple service that deletes a Organisation entity from database. What if we wanted to publish a Kafka event when a organisation is deleted successfully? We could use the native java KafkaProducer but what if we wanted to use the FS2 KafkaProducer. We need something to link our OrganisationService and the FS2 KafkaProducer stream

We will use a Queue for this purpose. This idea is also expressed in the official FS2 documentation — https://fs2.io/#/guide?id=talking-to-the-external-world. Let’s create two methods enqueue and dequeue that push and pull data into our Queue.

EventStream class is made with one goal — use any externalDataSource to publish data to Kafka. It does that so by internally using a Queue. enqueue method pulls data from external stream and pushes them into a queue. The deqWithKafka method basically pulls elements from the queue, creates Kafka records and pushes them into Kafka using the supplied KafkaProducer.

The most critical part is linking the two methods in the stream method. A stream of KafkaProducer is created and so is a stream of Queue. We must use flatMap to access the producer and the actual queue and to ensure only one instance of each is created. The queue and producer are passed to the enqueue and dequeue methods to create the streams. The two streams are then started concurrently to ensure that both steps happen.

Now the question is how to use EventStream class to push delete events? We again need a Queue to link them.

We create an unbounded Queue. Although in real we would care about the memory usage and probably choose a queue of suitable length. We create an instance of EventStream using the newly created queue and then we start the stream. We also pass the newly created queue to the delete method. Here is how the delete method would use the queue.

Every time a delete happens we push an element to queue. The Stream created inside EventStream class will ensure that every element pushed to this common queue is

  1. pulled and then pushed to an internal queue inside the enqueue method.
  2. As soon as its enqueued in the internal queue its pulled again, a producer record is created and then the record is pushed to Kafka

Steps 1 and 2 will keep happening forever because of eventStream.stream.compile.drain.unsafeRunSync() that we have done . This ensures that the stream keeps running.

And that is it. We have successfully managed to link our external delete events to a Stream that is being used to push data into Kafka. Hopefully this helps you in integrating FS2-Kafka into more code bases.

Further reading

  1. https://fs2.io/#/guide?id=talking-to-the-external-world
  2. https://underscore.io/blog/posts/2018/03/20/fs2.html

--

--