FS2-Kafka in real world -Part 2
This is a continuation of series on how to use F2-Kafka in real world. Checkout part1 to understand what is F2-Kafka and how it can be used ideally.
FS2-Kafka documentation gives examples on how to use a KafkaProducer to produce data. All examples assume that there is an existing stream of data from where data will be pushed to Kafka.
This raises a question. What if we want to push data in Kafka when there is no source stream? Consider this example
A simple service that deletes a
Organisation entity from database. What if we wanted to publish a Kafka event when a organisation is deleted successfully? We could use the native java KafkaProducer but what if we wanted to use the FS2 KafkaProducer. We need something to link our
OrganisationService and the FS2
We will use a
Queue for this purpose. This idea is also expressed in the official FS2 documentation — https://fs2.io/#/guide?id=talking-to-the-external-world. Let’s create two methods
dequeue that push and pull data into our
EventStream class is made with one goal — use any
externalDataSource to publish data to Kafka. It does that so by internally using a
enqueue method pulls data from external stream and pushes them into a queue. The
deqWithKafka method basically pulls elements from the queue, creates Kafka records and pushes them into Kafka using the supplied
The most critical part is linking the two methods in the
stream method. A stream of
KafkaProducer is created and so is a stream of
Queue. We must use flatMap to access the producer and the actual queue and to ensure only one instance of each is created. The queue and producer are passed to the
dequeue methods to create the streams. The two streams are then started
concurrently to ensure that both steps happen.
Now the question is how to use
EventStream class to push delete events? We again need a
Queue to link them.
We create an unbounded
Queue. Although in real we would care about the memory usage and probably choose a queue of suitable length. We create an instance of
EventStream using the newly created queue and then we start the stream. We also pass the newly created queue to the delete method. Here is how the delete method would use the queue.
Every time a delete happens we push an element to queue. The
Stream created inside
EventStream class will ensure that every element pushed to this common queue is
- pulled and then pushed to an internal queue inside the
- As soon as its
enqueuedin the internal queue its pulled again, a producer record is created and then the record is pushed to Kafka
Steps 1 and 2 will keep happening forever because of
eventStream.stream.compile.drain.unsafeRunSync() that we have done . This ensures that the stream keeps running.
And that is it. We have successfully managed to link our external delete events to a
Stream that is being used to push data into Kafka. Hopefully this helps you in integrating FS2-Kafka into more code bases.