Kafka provides a fair amount of configuration and pluggability out of the box. With configuration settings, you can control many features on the producer, broker, and consumer. Beyond simple settings, further customization can be found in several pluggable interfaces.
Looking at the producer side in particular, control over things like synchronous vs. asynchronous behavior, flush times, compression, etc. are easily controlled with configuration settings. Taking the next step, several simple interfaces provide customizability around how generic types are converted to Kafka messages before sending to the broker. A callback interface (kafka.producer.async.CallbackHandler) also allows for recording or customizing data as it goes through the various stages of the AsyncProducer.
If you need still more control and are willing to take more responsibility, the kafka.producer.async.IEventHandler interface provides the hook-point. Implementing this interface is a bit lower level as you’re dealing with batching and sending to the broker yourself. Let’s take a look at an example to see what’s involved.
The standard AsyncProducer does a good job of batching the messages that have been queued up by topic and broker, but the result is a single kafka.message.Message for each item you’ve queued to the producer. Suppose instead you want to batch all messages for a particular topic/broker into a single Message. In this example we’ll simply concatenate the original string messages into a comma-delimited string that goes to the broker.
The interface to implement, assuming you’re doing this in Java, is kafka.javaapi.producer.async.EventHandler<T>. The primary method of interest is:
public void handle(List<QueueItem<String>> items, SyncProducer producer, Encoder<String> encoder)
This method is called for each batch and is responsible for getting the QueueItems sent to the broker. We see that the underlying SyncProducer is passed in so that should be easy enough. Note that the String type parameter for QueueItem will vary based on the type your application is posting to the producer.
Here is what an implementation of that might look like:
Simple enough, but the interesting behavior is in the helper methods. First off, we have a simple value class Destination that helps us map topics to partitions on the broker:
Next is our helper that groups each of the QueueItems by the destination to which it is being sent:
Again, nothing too surprising here. Now that we have them grouped by destination, this is where we’ll deviate from the standard behavior. The createRequests method is responsible for creating the actual requests that go to the broker (through the SyncProducer).
Notice the granularity of the original messages as they relate to a producer request and Kafka message. Multiple QueueItems are put into a single Kafka message as seen in the getKafkaMessage() method that concatenates all of the original messages for a destination into a single comma-delimited string:
While this method returns a List<Message> this is merely a convenience for the caller–it will only ever add the single message. Also, note that we get the Encoder and the ProducerConfig in the callback which allows us to honor those settings.
Finally, the EventHandler is passed to the Producer when it’s constructed either through a direct constructor call or by setting the “event.handler” producer property.
As is the case with examples, this one does not implement all functionality and does little error-checking in the interest of clarity. The standard behavior has additional functionality such as compression on a per-topic basis, so you’ll want to pay attention to that if you’re taking advantage of those features.