Bend, Don’t Break

This post explains how we solved a major problem we faced: the challenge of scaling our realtime data processing systems to handle ever-increasing volumes without drastically over-building. By using Kafka as a queuing system in the initial ingest, we’ve been able to build a system that elegantly scales to handle unpredictable load and that lets us “go wide” when we need to process higher than normal data volumes.

Drinking From The Firehose

At Gnip, we are continuously ingesting realtime firehoses of activities from multiple social media publishers (Twitter, Tumblr, WordPress, just to name a few). Due to breaking news stories or major events, the number of activities per second that we consume can spike up significantly. We have seen, for instance, upwards of sustained 20,000 Tweets per second coming in from Twitter during the Super Bowl or on election night – more than twice the average of 6-8,000 Tweets per second we see these days.

So far, we have always scaled each of our components to handle the highest imaginable load. This presents a challenge though, since the “highest imaginable load” increases continuously as use of social media increases. Three years ago, on average, we saw 1,000 Tweets per second. Now we are up to more than 6,000 Tweets per second every day, and peak volumes have increased in a similar fashion. Rather than have to continuously adjust and plan ahead for what we think the “highest imaginable” load might be, we’ve sought an approach that would let us handle whatever volumes might come in the front door.

Bend, Don’t Break

Therefore, we have spent a lot of time and effort thinking about what we do when we get a spike that is higher than the highest imaginable load – say 50,000 Tweets a second. Who knows, perhaps the 2chan community will flash mob twitter again, and we wouldn’t want to miss that. We want to make sure that we handle this event gracefully, which for us means concretely to never drop an activity or miss an enrichment. (When we add metadata to an activity – like a Klout score or an unwound URL – we call that an ‘enrichment’).

So, inspired by this blogpost, we decided that we will provision our inbound (or edge) infrastructure that consumes the firehoses to handle anything the publishers could ever throw at us, but have the internal components be more flexible. This allows us to guarantee an even higher level of reliability without spending exponentially more resources just to cover rare peak events.

Borrowing some football terminology we call this the “Bend, but don’t break” strategy.

Kafka The critical piece of open source software that allows us to do this easily is Kafka, since consumers can read from the Kafka topics at their own pace. Many other queuing systems are not as tolerant to slow consumers. They can implement back pressure and nothing is lost, as long as they don’t fall behind the window of data stored in Kafka queues. (The retention period is solely limited by how much you’d like to spend on drives.)

The consequence to downstream components is that the spikes are somewhat smoothed out. The following picture from a load test compares the volume we are getting from a publisher on the inbound edge of our infrastructure compared with the inbound volume of an internal component during a load test with a sustained volume of 25k Tweets/second.

As you can see, the internal component handled 20k tweets/second quite happily, but started applying back pressure at 25k tweets/sec and therefore took a couple of minutes to drain the backlog until it has completely caught up with real-time. This load test helped us identify the need to further scale that component to handle a possible 25k tweets/sec in realtime, so we increased capacity accordingly. But even through the spike was larger than expected, we did not experience any data loss or operational issues. Full data fidelity at all times is a major benefit of this approach and a requirement of our products.

Implementation

In order to make this work, we ensure that the apps never pull more work from Kafka than they can handle. For instance, in one step of our pipeline we extract information required for calculating enrichments from activities, e.g. Tweet text in order to run language classification, and then distribute that work to multiple worker hosts in order to parallelize the computation. The workers pull work from queues on a distributor app as fast as they can complete work.

Since we build our backend system in Java we leverage the functionality from java.util.concurrent quite extensively. As queues we use for this purpose LinkedBlockingQueue, so that if the workers can’t keep up, the threads on the distributor block until the queues get drained.  It is just as important to size the blocking queues properly when they are created:

LinkedBlockingQueue<EnrichmentRequest> requestQueue =
       new LinkedBlockingQueue<EnrichmentRequest>(MAX_QUEUE_SIZE);

Since reading from Kafka happens on the same thread, we naturally apply back pressure. MAX_QUEUE_SIZE depends mostly on how much memory you have available and how quickly you would want to apply back pressure.

One additional bit of work the distributor app has to do, is to park activities for a couple of seconds in order to give time to the workers to calculate the enrichments. Again java.util.concurrent comes in handy for building a BlockingDelayQueue:

public class BlockingDelayQueue {
    DelayQueue delayQueue = new DelayQueue();
    Semaphore capacitySemaphore;

    public BlockingDelayQueue(int capacity) {
        capacitySemaphore = new Semaphore(capacity);
    }

    public void put(T item) throws InterruptedException {
        capacitySemaphore.acquire();
        delayQueue.put(item);
    }

    public T take() throws InterruptedException {
        T item = delayQueue.take();
        capacitySemaphore.release();
        return item;
    }

    public int size() {
        return delayQueue.size();
    }
}

 

This class is built to support back pressure. In case the queue is not getting drained, the application backs off automatically.

Scalability

Apart from the persistent queuing feature from Kafka, the system also gains natural horizontal scalability out of the box, since internal components can be parallelized by consuming only a subset of partitions. In Kafka each topic is spread out across a configurable number of partitions in order to support horizontal scalability. Stay tuned to the blog for a deeper dive into what it takes to pump firehoses through Kafka and not miss a drop.

References

  1. http://kafka.apache.org

  2. http://mechanical-sympathy.blogspot.co.uk/2012/05/apply-back-pressure-when-overloaded.html

Custom Event Batching with Apache Kafka

Kafka provides a fair amount of configuration and pluggability out of the box. With configuration settings, you can control many features on the producer, broker, and consumer. Beyond simple settings, further customization can be found in several pluggable interfaces.

Looking at the producer side in particular, control over things like synchronous vs. asynchronous behavior, flush times, compression, etc. are easily controlled with configuration settings. Taking the next step, several simple interfaces provide customizability around how generic types are converted to Kafka messages before sending to the broker. A callback interface (kafka.producer.async.CallbackHandler) also allows for recording or customizing data as it goes through the various stages of the AsyncProducer.

If you need still more control and are willing to take more responsibility, the kafka.producer.async.IEventHandler interface provides the hook-point. Implementing this interface is a bit lower level as you’re dealing with batching and sending to the broker yourself. Let’s take a look at an example to see what’s involved.

The standard AsyncProducer does a good job of batching the messages that have been queued up by topic and broker, but the result is a single kafka.message.Message for each item you’ve queued to the producer. Suppose instead you want to batch all messages for a particular topic/broker into a single Message. In this example we’ll simply concatenate the original string messages into a comma-delimited string that goes to the broker.

The interface to implement, assuming you’re doing this in Java, is kafka.javaapi.producer.async.EventHandler<T>. The primary method of interest is:

public void handle(List<QueueItem<String>> items, SyncProducer producer, Encoder<String> encoder)

This method is called for each batch and is responsible for getting the QueueItems sent to the broker. We see that the underlying SyncProducer is passed in so that should be easy enough. Note that the String type parameter for QueueItem will vary based on the type your application is posting to the producer.

Here is what an implementation of that might look like:

Could not embed GitHub Gist f997df36b8020fba4244: Bad credentials

Simple enough, but the interesting behavior is in the helper methods. First off, we have a simple value class Destination that helps us map topics to partitions on the broker:

Could not embed GitHub Gist 3bd494ce0b57f65f8db1: Bad credentials

Next is our helper that groups each of the QueueItems by the destination to which it is being sent:

Could not embed GitHub Gist 0f5113556dbd6451b420: Bad credentials

Again, nothing too surprising here. Now that we have them grouped by destination, this is where we’ll deviate from the standard behavior. The createRequests method is responsible for creating the actual requests that go to the broker (through the SyncProducer).

Could not embed GitHub Gist 9a9a653d2f9070668966: Bad credentials

Notice the granularity of the original messages as they relate to a producer request and Kafka message. Multiple QueueItems are put into a single Kafka message as seen in the getKafkaMessage() method that concatenates all of the original messages for a destination into a single comma-delimited string:

Could not embed GitHub Gist b8afa89b5e44b7162d13: Bad credentials

While this method returns a List<Message> this is merely a convenience for the caller–it will only ever add the single message. Also, note that we get the Encoder and the ProducerConfig in the callback which allows us to honor those settings.

Finally, the EventHandler is passed to the Producer when it’s constructed either through a direct constructor call or by setting the “event.handler” producer property.

 As is the case with examples, this one does not implement all functionality and does little error-checking in the interest of clarity. The standard behavior has additional functionality such as compression on a per-topic basis, so you’ll want to pay attention to that if you’re taking advantage of those features.

 

 

Understanding the Kafka Async Producer

We’ve been talking recently about our use of Apache Kafka–a system that takes a different view of implementing messaging. Because we’re using Kafka in a high-volume area of our system, we’ve had to spend time understanding how to get messages through reliably at high rates. While we’ve spent time on both the producing and consuming side of the brokers, this post focuses on the producing side only.

At a high level Kafka offers two types of producers–synchronous and asynchronous. As a user of the producer API, you’re always interacting with the kafka.producer.Producer class, but it’s doing a fair bit for you under-the-covers depending on how you’ve configured it.

By default the Producer uses SyncProducers underneath. SyncProducer does what it sounds like–it sends the messages to the broker synchronously on the thread that calls it. In most high volume environments this obviously isn’t going to suffice. It’s too many network sends on too many small-ish messages.

By setting the producer.type flag to ‘async’, the Producer class uses AsyncProducers under the covers. AsyncProducer offers the capability not only to do the sends on a separate thread, but to batch multiple messages together into a single send. Both characteristics are generally desirable–isolating network I/O from the threads doing computation and reducing the number of network messages into a smaller number of larger sends. Introductory information on the producers can be found here.

An important note is that Kafka supports compressing messages transparently such that neither your producer or consumer code has to do anything special. When configured for compression (via the compression.codec producer config setting) Kafka automatically compresses the messages before sending them to the broker. Importantly, this compression happens on the same thread that the send occurs.

Because the compress and send are both relatively time consuming operations, it’s an obvious place you may want to parallelize. But how parallel is the Producer?

A producer configured for async mode creates a separate underlying AsyncProducer instance for each broker via its internal ProducerPool class. Each AsyncProducer has its own associated background thread (named ProducerSendThread) that does the work for the send. In other words, there’s one thread per broker so your resulting parallelism for compressing and sending messages is based on the number of brokers in the cluster.

Here’s a snippet of the threads in an application with a single broker:

AsyncProducer

And here’s what they look like under the same producer load with 3 brokers in the cluster:

AsyncProducer

In the first image you can see that the single thread is doing more work than when it’s spread out, as you would expect.

A common question is “Why am I seeing QueueFullExceptions from the producer”. This often means the ProducerSendThreads are saturated and unable to keep up with the rate of messages going into the Producer. An obvious approach to deal with this is adding brokers to the cluster and thus spreading the message sends across more threads, as well as lightening the load on each broker.

Adding brokers to the cluster takes more consideration than just how efficient the producers are. Clearly there are other approaches you could take such as building a different async model on top of the SyncProducer that’s not dependent on the number of brokers. But it’s useful to know what the options are, and in the end adding brokers may be useful for your architecture in other ways.

If the topology of your producer applications is such that there are many applications producing a relatively small number of messages then none of this may be of concern. But if you have a relatively small number of producer applications sending many large messages, understanding how to spread out that work can be of real importance.

Finding Bugs at High Speed: Kafka Thread Safety

Whenever you are adding a new component to a system there are going to be integration pains. What do all these log lines mean? Is this an error I have to worry about, or is it part of normal operation? Sometimes these questions take a while to answer, particularly if the tools you are using are chatty.

Recently, we added a Kafka, a distributed messaging system, to our architecture. Kafka lets us focus on what we’re good at, streaming large numbers of activities, very quickly, while opening up our architecture to a number of scalability improvements in the future.

When we first started experimenting with Kafka, we had a number of problems with it. Its logging was fairly chatty. Sometimes it would even log stack traces under normal operation. This made it hard to figure out whether a particular log message was an error or normal. Also, we had some configuration options set too small–some timeouts were set too short, which meant that the client’s connections to the Kafka server were getting interrupted regularly.

One stack trace stood out though a null pointer exception from inside the Scala JSON parser (stacktrace).

I searched for the first line in the stack trace and found this issue in Scala’s issue tracker (issues.scala-lang.org/browse/SI-4929). It turned out that Scala’s standard library JSON parser wasn’t thread safe. It has been patched, but not for the version of Scala we’re using with Kafka.

Looking further up the call chain, I saw that JSON parser was being called from within Kafka’s client code. Specifically, it was being called when the consumers were being rebalanced. Kafka rebalances consumers for a topic whenever either a consumer is added to the topic or it’s session with ZooKeeper times out. We had our timeouts set too low, so that was happening often. In addition, we were creating multiple consumers within the same JVM. The end result was that we were parsing JSON using multiple threads at around the same time with a thread unsafe parser and doing it fairly frequently. This was a recipe for a concurrency problem.

We already had forked Kafka to make some configuration changes, so I worked up a patch for our fork. I changed Kafka’s consumer code so it would create a new JSON parser each time it asked for topic information from ZooKeeper. With that change, the separate threads were no longer using a shared parser.

Creating new parsers every time Kafka needed data from ZooKeeper added a small amount of overhead in exchange for thread safety. Since rebalancing should happen infrequently, adding that overhead didn’t have much effect on performance. That was especially true after we reconfigured the timeouts with ZooKeeper so we stopped unbalancing unnecessarily.

To wrap things up, I filed a ticket on Kafka’s issue tracker with my patch (issues.apache.org/jira/browse/KAFKA-379). A fix was put in and now new releases of Kafka will not have thread safety issues with JSON parsing.