Custom Event Batching with Apache Kafka

Kafka provides a fair amount of configuration and pluggability out of the box. With configuration settings, you can control many features on the producer, broker, and consumer. Beyond simple settings, further customization can be found in several pluggable interfaces.

Looking at the producer side in particular, control over things like synchronous vs. asynchronous behavior, flush times, compression, etc. are easily controlled with configuration settings. Taking the next step, several simple interfaces provide customizability around how generic types are converted to Kafka messages before sending to the broker. A callback interface (kafka.producer.async.CallbackHandler) also allows for recording or customizing data as it goes through the various stages of the AsyncProducer.

If you need still more control and are willing to take more responsibility, the kafka.producer.async.IEventHandler interface provides the hook-point. Implementing this interface is a bit lower level as you’re dealing with batching and sending to the broker yourself. Let’s take a look at an example to see what’s involved.

The standard AsyncProducer does a good job of batching the messages that have been queued up by topic and broker, but the result is a single kafka.message.Message for each item you’ve queued to the producer. Suppose instead you want to batch all messages for a particular topic/broker into a single Message. In this example we’ll simply concatenate the original string messages into a comma-delimited string that goes to the broker.

The interface to implement, assuming you’re doing this in Java, is kafka.javaapi.producer.async.EventHandler<T>. The primary method of interest is:

public void handle(List<QueueItem<String>> items, SyncProducer producer, Encoder<String> encoder)

This method is called for each batch and is responsible for getting the QueueItems sent to the broker. We see that the underlying SyncProducer is passed in so that should be easy enough. Note that the String type parameter for QueueItem will vary based on the type your application is posting to the producer.

Here is what an implementation of that might look like:

Could not embed GitHub Gist f997df36b8020fba4244: Bad credentials

Simple enough, but the interesting behavior is in the helper methods. First off, we have a simple value class Destination that helps us map topics to partitions on the broker:

Could not embed GitHub Gist 3bd494ce0b57f65f8db1: Bad credentials

Next is our helper that groups each of the QueueItems by the destination to which it is being sent:

Could not embed GitHub Gist 0f5113556dbd6451b420: Bad credentials

Again, nothing too surprising here. Now that we have them grouped by destination, this is where we’ll deviate from the standard behavior. The createRequests method is responsible for creating the actual requests that go to the broker (through the SyncProducer).

Could not embed GitHub Gist 9a9a653d2f9070668966: Bad credentials

Notice the granularity of the original messages as they relate to a producer request and Kafka message. Multiple QueueItems are put into a single Kafka message as seen in the getKafkaMessage() method that concatenates all of the original messages for a destination into a single comma-delimited string:

Could not embed GitHub Gist b8afa89b5e44b7162d13: Bad credentials

While this method returns a List<Message> this is merely a convenience for the caller–it will only ever add the single message. Also, note that we get the Encoder and the ProducerConfig in the callback which allows us to honor those settings.

Finally, the EventHandler is passed to the Producer when it’s constructed either through a direct constructor call or by setting the “event.handler” producer property.

 As is the case with examples, this one does not implement all functionality and does little error-checking in the interest of clarity. The standard behavior has additional functionality such as compression on a per-topic basis, so you’ll want to pay attention to that if you’re taking advantage of those features.



Understanding the Kafka Async Producer

We’ve been talking recently about our use of Apache Kafka–a system that takes a different view of implementing messaging. Because we’re using Kafka in a high-volume area of our system, we’ve had to spend time understanding how to get messages through reliably at high rates. While we’ve spent time on both the producing and consuming side of the brokers, this post focuses on the producing side only.

At a high level Kafka offers two types of producers–synchronous and asynchronous. As a user of the producer API, you’re always interacting with the kafka.producer.Producer class, but it’s doing a fair bit for you under-the-covers depending on how you’ve configured it.

By default the Producer uses SyncProducers underneath. SyncProducer does what it sounds like–it sends the messages to the broker synchronously on the thread that calls it. In most high volume environments this obviously isn’t going to suffice. It’s too many network sends on too many small-ish messages.

By setting the producer.type flag to ‘async’, the Producer class uses AsyncProducers under the covers. AsyncProducer offers the capability not only to do the sends on a separate thread, but to batch multiple messages together into a single send. Both characteristics are generally desirable–isolating network I/O from the threads doing computation and reducing the number of network messages into a smaller number of larger sends. Introductory information on the producers can be found here.

An important note is that Kafka supports compressing messages transparently such that neither your producer or consumer code has to do anything special. When configured for compression (via the compression.codec producer config setting) Kafka automatically compresses the messages before sending them to the broker. Importantly, this compression happens on the same thread that the send occurs.

Because the compress and send are both relatively time consuming operations, it’s an obvious place you may want to parallelize. But how parallel is the Producer?

A producer configured for async mode creates a separate underlying AsyncProducer instance for each broker via its internal ProducerPool class. Each AsyncProducer has its own associated background thread (named ProducerSendThread) that does the work for the send. In other words, there’s one thread per broker so your resulting parallelism for compressing and sending messages is based on the number of brokers in the cluster.

Here’s a snippet of the threads in an application with a single broker:


And here’s what they look like under the same producer load with 3 brokers in the cluster:


In the first image you can see that the single thread is doing more work than when it’s spread out, as you would expect.

A common question is “Why am I seeing QueueFullExceptions from the producer”. This often means the ProducerSendThreads are saturated and unable to keep up with the rate of messages going into the Producer. An obvious approach to deal with this is adding brokers to the cluster and thus spreading the message sends across more threads, as well as lightening the load on each broker.

Adding brokers to the cluster takes more consideration than just how efficient the producers are. Clearly there are other approaches you could take such as building a different async model on top of the SyncProducer that’s not dependent on the number of brokers. But it’s useful to know what the options are, and in the end adding brokers may be useful for your architecture in other ways.

If the topology of your producer applications is such that there are many applications producing a relatively small number of messages then none of this may be of concern. But if you have a relatively small number of producer applications sending many large messages, understanding how to spread out that work can be of real importance.

Welcome to Gnip’s Engineering Blog

One thing you learn in a startup is to never get too tied to where you are today. You’re trying to find a new market or serve an existing one in a new way. Not only is the business bound to change over time, but the techonology as well. As an introduction to our Engineering blog, here’s a brief history of the evolution of Gnip’s technology.

When Gnip first started it set out to “solve the polling problem”. The world of public web APIs was younger than it is now, social media itself younger still, and the business of social media data barely in its infancy. The vast, vast majority of infrastructure was built around HTTP polling APIs. The amount of data available wasn’t nearly as large as it is today (to make a monumental understatement), so the polling APIs were generally adequate for most uses.

While relatively straight-forward to implement and consume, polling APIs have obvious inefficiencies on both ends. Gnip’s first system (well, technically second) was largely built to turn polling APIs into push APIs. It gathered data through various means like polling, sent the data through a centralized pipeline, finally filtering and fanning out the data to all consumers. This stage of the technology was characterized by optimizing polling APIs, both in the way we gathered data and the way we sent it to consumers. Sure, we got some data via other mechanisms like XMPP as well, but request/response APIs ruled so took up much of our time.

At the same time we offered a more efficient push API to our consumers via WebHooks, but many still wanted to use polling because of its relative ease and certain advantages such as easier firewall navigation. This system, known these days as “the 1.0 system”, was a centralized, clustered system written largely in Java with some Python, Ruby and other languages thrown in.

Back then the market for social media data wasn’t nearly as well defined as it is today. As the marketplace evolved it became clear a change was needed. Publishers wanted to have visibility into who was hitting their APIs, not be shielded by a third party. They had invested in their APIs and could handle the traffic of polling by individual consumers just fine.

Enter the second phase of Gnip’s technology–the appliance model.

Instead of a centralized system gathering data once for all customers, the appliance model accepted the overall inefficiency and had each customer get the data from the source API separately. Built with the well-known technology stack of Ruby, Rails, and MySQL, each customer got its own EC2 instance on which Gnip’s software collected data on their behalf, and only their behalf. No data was shared between customers. Many of the lessons learned in the 1.0 system applied–polling most efficiently and predictably while honoring the publisher’s rate limits and other Terms of Service. Beyond that, other, more efficient techniques of delivering data such as WebHooks and Streaming HTTP were starting to gain traction and included in the appliance implementation.

Nothing stands still though, and the business environment continued to evolve to where publishers saw growing demand for the data for legitimate commercial and other uses. Partnerships were formed, requirements around quality of service got stronger, and volumes continued to grow. We put a lot into the appliances to make them reliable and easy-to-use, but ultimately as a single instance solution they have their limits in capacity. Volumes from the largest publishers were growing beyond them.

So here we are at phase three–a centralized Java system with a healthy dose of Ruby. It’s come full circle in some ways, though with many evolutionary improvements over the 1.0 system. And much larger scale. Some of those more efficient models of consumption are core to this system and allow it to handle far more data. Ruby applications play an important and larger role than in the 1.0 system, so we’ve ended up with a great balance of tools in our toolbox. The appliances continue to serve more moderate volume use cases quite well with unique ease-of-use, insight, and control for the user.

Along the way we’ve learned plenty of lessons and benefited greatly as developers from the writing of others sharing their experiences. So here we’ll share our own in hopes it may help others and aid in our own continued evolution as well.