ZIO Kafka is a lean and expressive library for interacting with Kafka through a ZIO Streams-based interface.
# FUNCTIONAL PROGRAMMING
# STREAM PROCESSING
At Ziverge, we're big fans of Apache Kafka for creating a messaging fabric between microservices. From replicating data using changelog topics, to distributing work to multiple consumers, Kafka is a robust and scalable data store for processing loads of data.
In this post, we'd like to introduce you to ZIO Kafka - a lean library that provides an elegant interface to the Kafka APIs based on ZIO and ZIO Streams. After a quick introduction, we'll show how you can easily create a consumer process that reads records from Kafka, chunks them up and writes them to files.
We're going to use some imports throughout this file, so for brevity's sake, we'll list them here:
Interfacing applications with Kafka using the standard consumer and producer libraries is not an easy task, particularly if we're looking for asynchronous, non-blocking processing that integrates well with ZIO. Consuming Kafka topics is an effectively infinite computation; at any point in time, more data could be written to those topics. Therefore, long-running applications that consume these topics must be structured as infinite loops.
For example, assuming we've constructed a Kafka consumer, here's how the consumption loop would look like:
While great as an initial approach, there are two major downsides to here. First, all of the processing steps are done synchronously. We will not commit our offsets before processing the current batch of records is done, and we will not fetch additional records before we're done committing the offsets. There's a strict happens-before relation between each of the steps here, and complete lack of pipelining. That means that if processRecords is slow, we will see delays in polling the consumer.
Second, the infinite loop structure scales pretty badly for control flow that requires handling more than one batch of data - for example, aggregating batches of records up to a timeout, and once the timeout expires, only then performing the processing and committing. We can try to apply that to the above snippet to see how painful it is.
Let's factor out the poll and commit steps:
With the structure of runConsumerLoop, it is clear that there's no good way for the f function to operate on more than one batch of records at a time. For that to happen, f must store the buffered batches in a mutable reference somewhere. Another problem immediately pops up with that though: even though f buffered some batches for later processing, runConsumerLoop would still commit those batches!
As an alternative approach to structuring our code, we could use streams of records: ZStream[Any, Throwable, ConsumerRecord[String, String]]. This way, we're inverting the control; the processing function can consume that stream and apply whatever strategy makes sense for processing those records:
I think you'd all agree that this declarative style of code is much more attractive compared to the previous loop. Beyond aesthetics, note that we only needed one line to aggregate the incoming records into batches of 1000 (with up to 30 seconds of waiting). That's the biggest win, by far!
Let's see how we setup an application to use ZIO Kafka. The current version of the library is built against the 1.0.0-RC18-2 version of ZIO. Add the library to your sbt build like so:
First, we'll create the Kafka Consumer as a ZLayer, and provide it to our consumer stream:
The individual operations on the consumer and producer can be accessed through accessor methods on the zio.kafka.consumer.Consumer and zio.kafka.producer.Producer objects. For example, this is how we'd create a stream that subscribes to a topic and consumes it:
The type signature tells us that this stream depends on a Kafka consumer that deals with strings. We can add some effects to the stream by printing every consumed value and committing the offsets:
At this point, we can provide the Kafka Consumer layer to our stream and convert it to a ZManaged fiber. This ZManaged value, when run, will execute the stream in a fiber and interrupt the fiber when the ZManaged is released:
The provideCustomLayer function is available on most ZIO data types (ZIO, ZManaged, ZStream, etc.) and is extremely convenient when the environment requirements that are left after providing our layer are a subset of the standard ZEnv environment. In our case, the requirements that are left are Blocking and Clock, so we can use this handy form.
It is highly recommended to compose the main modules of your application as ZManaged values (or, equivalently, ZLayer values if you'd like to inject them as dependencies to other parts of your application). ZManaged values can be freely composed with other values with the flatMap, <*>, ZManaged.collectAll combinators just like ZIO, with the added benefit of maintaining correct finalizer ordering.
For our application's entrypoint, we'll execute two of these streams, each one consuming a different topic. First, here's a function that does everything we've discussed so far:
The application is constructed as a ZManaged value that executes two streams, and yields a ZIO value that can be used to join all of the streams. We're doing this to make sure that our main fiber exits if one of the background fibers exits or fails. We can execute that ZIO value inside ZManaged#use, which we immediately call:
Ok! That's a good skeleton for our application. We can now proceed to writing the actual logic which batches records and writes them to files.
For our example, we'll write a consumer that groups incoming records into batches of 16kB or 4096 records, while waiting for up to 30 seconds for those conditions to be fulfilled.
We will package this functionality up in a ZIO module that provides a stream transformer function - that's a function of the form ZStream[R, E, A] => ZStream[R, E, B]. Our module will also keep a running counter of the number of files that have been written; that way the application can expose some runtime metrics.
To start, we will specify our interface and accessors:
The ZStream.accessStream function is a handy utility for accessing functions on the environment that return streams. It is most commonly used when defining accessors for your modules.
Next, let's write the implementation for the module:
Whew, a lot to unpack here! Let's walk through it step by step. First, we have the definition for the ZSink named batchRecords. A complete tutorial of ZSink is beyond the scope of this article, but a good intuition for sinks are composable aggregators of values. This sink implements a weighted fold: it's like a normal fold, but it tracks the cost of the aggregated value using a cost function defined on the elements. This sink will aggregate records into a list for as long as the list has less than 16kB of records.
We're also applying ZSink#map on the sink: once the sink reaches 16kB of records and yields its result (the list of records), we're applying a function to that result. In this case, we're reversing the list, concatenating the string and extracting the UTF-8 bytes of the data.
Next, we have the batchOffsets sink. This one is identical to a left fold on a list; we're aggregating the individual offsets on the records to a batch of offsets that can be committed at once.
We're using both of these sinks in the call to ZStream#aggregate. They are combined using the ZSink#zipPar function: this function creates a new sink that feeds the incoming elements into both sinks, ending as soon as one of the sinks signals completion. The resulting sink will yield a tuple of the two results. The ZStream#aggregate function will repeatedly apply the composite sink to the stream, resulting in a stream that consists of the aggregated results.
Finally, we're writing the chunks of data to files using ZStream#mapM and some standard JDK functions.
To use our module's live implementation, we want it to be available as a ZLayer. Here's how we can create a constructor for that (usually you'd want this available on the RecordChunking object as def live):
Ok! All done with the module. To use it, we can redefine the consumer stream we created earlier as such:
Note how we're now using the ZStream#via function, a convenience function for applying a function to the stream; and the ZStream#provideSomeLayer function, which allows us to provide some of the layers required by our stream. We need to aid the compiler's type inference by explicitly specifying which layers are left after providing.
Last up is our main entrypoint:
We deferred providing the record chunking layer up to the top-most level of our application so it is shared between the streams.
That's it! The complete example is available at https://github.com/zivergetech/zio-kafka-example-app. It also includes a ocker-compose.yaml file that you can use to setup a local Kafka broker. The\ README.md contains some instructions on how to start the broker, write some data to the topics and run the application.
Enjoy, and join us on the ZIO Discord (https://discord.gg/2ccFBr4) if you have any follow-up questions!
This blog post was type-checked using mdoc against ZIO 1.0.0-RC18-2 and ZIO Kafka 0.8.0.
Stay ahead with the latest insights and breakthroughs from the world of technology. Our newsletter delivers curated news, expert analysis, and exclusive updates right to your inbox. Join our community today and never miss out on what's next in tech.