FS2-Kafka in real world -Part 2
This is a continuation of series on how to use F2-Kafka in real world. Checkout part1 to understand what is F2-Kafka and how it can be used ideally.
FS2-Kafka documentation gives examples on how to use a KafkaProducer to produce data. All examples assume that there is an existing stream of data from where data will be pushed to Kafka.
This raises a question. What if we want to push data in Kafka when there is no source stream? Consider this example
A simple service that deletes a Organisation
entity from database. What if we wanted to publish a Kafka event when a organisation is deleted successfully? We could use the native java KafkaProducer but what if we wanted to use the FS2 KafkaProducer. We need something to link our OrganisationService
and the FS2 KafkaProducer
stream
We will use a Queue
for this purpose. This idea is also expressed in the official FS2 documentation — https://fs2.io/#/guide?id=talking-to-the-external-world. Let’s create two methods enqueue
and dequeue
that push and pull data into our Queue
.
EventStream
class is made with one goal — use any externalDataSource
to publish data to Kafka. It does that so by internally using a Queue
. enqueue
method pulls data from external stream and pushes them into a queue. The deqWithKafka
method basically pulls elements from the queue, creates Kafka records and pushes them into Kafka using the supplied KafkaProducer
.
The most critical part is linking the two methods in the stream
method. A stream of KafkaProducer
is created and so is a stream of Queue
. We must use flatMap to access the producer and the actual queue and to ensure only one instance of each is created. The queue and producer are passed to the enqueue
and dequeue
methods to create the streams. The two streams are then started concurrently
to ensure that both steps happen.
Now the question is how to use EventStream
class to push delete events? We again need a Queue
to link them.
We create an unbounded Queue
. Although in real we would care about the memory usage and probably choose a queue of suitable length. We create an instance of EventStream
using the newly created queue and then we start the stream. We also pass the newly created queue to the delete method. Here is how the delete method would use the queue.
Every time a delete happens we push an element to queue. The Stream
created inside EventStream
class will ensure that every element pushed to this common queue is
- pulled and then pushed to an internal queue inside the
enqueue
method. - As soon as its
enqueued
in the internal queue its pulled again, a producer record is created and then the record is pushed to Kafka
Steps 1 and 2 will keep happening forever because of eventStream.stream.compile.drain.unsafeRunSync()
that we have done . This ensures that the stream keeps running.
And that is it. We have successfully managed to link our external delete events to a Stream
that is being used to push data into Kafka. Hopefully this helps you in integrating FS2-Kafka into more code bases.
Further reading