Bend, Don’t Break

This post explains how we solved a major problem we faced: the challenge of scaling our realtime data processing systems to handle ever-increasing volumes without drastically over-building. By using Kafka as a queuing system in the initial ingest, we’ve been able to build a system that elegantly scales to handle unpredictable load and that lets us “go wide” when we need to process higher than normal data volumes.

Drinking From The Firehose

At Gnip, we are continuously ingesting realtime firehoses of activities from multiple social media publishers (Twitter, Tumblr, WordPress, just to name a few). Due to breaking news stories or major events, the number of activities per second that we consume can spike up significantly. We have seen, for instance, upwards of sustained 20,000 Tweets per second coming in from Twitter during the Super Bowl or on election night – more than twice the average of 6-8,000 Tweets per second we see these days.

So far, we have always scaled each of our components to handle the highest imaginable load. This presents a challenge though, since the “highest imaginable load” increases continuously as use of social media increases. Three years ago, on average, we saw 1,000 Tweets per second. Now we are up to more than 6,000 Tweets per second every day, and peak volumes have increased in a similar fashion. Rather than have to continuously adjust and plan ahead for what we think the “highest imaginable” load might be, we’ve sought an approach that would let us handle whatever volumes might come in the front door.

Bend, Don’t Break

Therefore, we have spent a lot of time and effort thinking about what we do when we get a spike that is higher than the highest imaginable load – say 50,000 Tweets a second. Who knows, perhaps the 2chan community will flash mob twitter again, and we wouldn’t want to miss that. We want to make sure that we handle this event gracefully, which for us means concretely to never drop an activity or miss an enrichment. (When we add metadata to an activity – like a Klout score or an unwound URL – we call that an ‘enrichment’).

So, inspired by this blogpost, we decided that we will provision our inbound (or edge) infrastructure that consumes the firehoses to handle anything the publishers could ever throw at us, but have the internal components be more flexible. This allows us to guarantee an even higher level of reliability without spending exponentially more resources just to cover rare peak events.

Borrowing some football terminology we call this the “Bend, but don’t break” strategy.

Kafka The critical piece of open source software that allows us to do this easily is Kafka, since consumers can read from the Kafka topics at their own pace. Many other queuing systems are not as tolerant to slow consumers. They can implement back pressure and nothing is lost, as long as they don’t fall behind the window of data stored in Kafka queues. (The retention period is solely limited by how much you’d like to spend on drives.)

The consequence to downstream components is that the spikes are somewhat smoothed out. The following picture from a load test compares the volume we are getting from a publisher on the inbound edge of our infrastructure compared with the inbound volume of an internal component during a load test with a sustained volume of 25k Tweets/second.

As you can see, the internal component handled 20k tweets/second quite happily, but started applying back pressure at 25k tweets/sec and therefore took a couple of minutes to drain the backlog until it has completely caught up with real-time. This load test helped us identify the need to further scale that component to handle a possible 25k tweets/sec in realtime, so we increased capacity accordingly. But even through the spike was larger than expected, we did not experience any data loss or operational issues. Full data fidelity at all times is a major benefit of this approach and a requirement of our products.

Implementation

In order to make this work, we ensure that the apps never pull more work from Kafka than they can handle. For instance, in one step of our pipeline we extract information required for calculating enrichments from activities, e.g. Tweet text in order to run language classification, and then distribute that work to multiple worker hosts in order to parallelize the computation. The workers pull work from queues on a distributor app as fast as they can complete work.

Since we build our backend system in Java we leverage the functionality from java.util.concurrent quite extensively. As queues we use for this purpose LinkedBlockingQueue, so that if the workers can’t keep up, the threads on the distributor block until the queues get drained.  It is just as important to size the blocking queues properly when they are created:

LinkedBlockingQueue<EnrichmentRequest> requestQueue =
       new LinkedBlockingQueue<EnrichmentRequest>(MAX_QUEUE_SIZE);

Since reading from Kafka happens on the same thread, we naturally apply back pressure. MAX_QUEUE_SIZE depends mostly on how much memory you have available and how quickly you would want to apply back pressure.

One additional bit of work the distributor app has to do, is to park activities for a couple of seconds in order to give time to the workers to calculate the enrichments. Again java.util.concurrent comes in handy for building a BlockingDelayQueue:

public class BlockingDelayQueue {
    DelayQueue delayQueue = new DelayQueue();
    Semaphore capacitySemaphore;

    public BlockingDelayQueue(int capacity) {
        capacitySemaphore = new Semaphore(capacity);
    }

    public void put(T item) throws InterruptedException {
        capacitySemaphore.acquire();
        delayQueue.put(item);
    }

    public T take() throws InterruptedException {
        T item = delayQueue.take();
        capacitySemaphore.release();
        return item;
    }

    public int size() {
        return delayQueue.size();
    }
}

 

This class is built to support back pressure. In case the queue is not getting drained, the application backs off automatically.

Scalability

Apart from the persistent queuing feature from Kafka, the system also gains natural horizontal scalability out of the box, since internal components can be parallelized by consuming only a subset of partitions. In Kafka each topic is spread out across a configurable number of partitions in order to support horizontal scalability. Stay tuned to the blog for a deeper dive into what it takes to pump firehoses through Kafka and not miss a drop.

References

  1. http://kafka.apache.org

  2. http://mechanical-sympathy.blogspot.co.uk/2012/05/apply-back-pressure-when-overloaded.html

The Eclectic Art Of System Monitoring

The Problem

At Gnip, we deploy our software to hundreds of servers across multiple data centers and heterogeneous environments. Just architecting the software itself and how to create a streamlined deployment process on this scale is in itself worth a full blog post. In this post though I would like to focus on the aspect of how to make sure that your full application stack is healthy all the time.

The Solution?

On the mechanical side the solution is very straight forward: You deploy a system monitoring tool that watches continuously all servers and all applications that will send you friendly messages when something does not look right. There are plenty of tools available for this purpose. We ourselves use Nagios, which has become to some extend an industry standard.

That was easy, right?

Sadly, the first sentence in this paragraph leaves a lot of details out, and these details can kill your application stack and your engineering team at the same time. (Either by not finding or problems, or by keeping your engineers up all night.) In this post, I’d like to share how we at Gnip go about keeping software, hardware, and people-ware happy.

Watching All Applications

It is certainly not enough to just ensure that an application is running. In order to get meaningful insight, you have to add health checks that are particular to the application itself. For instance, in an application that consumes a continuous stream of data, you can be fairly certain that there is a problem if the transaction volume drops below a certain rate. Nagios allows quite easy integration with these kind of application specific health checks. We ourselves decided to expose an HTTP endpoint on our applications that will expose a digest of all health checks registered for an application. This allows the Nagios check to be uniform across all applications.

TIP: Standardize health APIs

Sending Friendly Messages

We configured our system so that certain types of alerts in our production system will cause text messages to be sent to the engineer who is on call. All engineers participate in this on call rotation. This process ensures a large sense of ownership, because it does not take many pages in the middle of the night to change an engineers’ attitude towards writing reliable and scalable code. (If it doesn’t bother the engineer herself, the grumpy spouse will have a strong opinion, too.)

TIP: Every engineer participates in on-call rotation

We also have a set of health checks that will send out emails instead of pages. We use this for situations where it appears that no immediate action can or has to be taken.

TIP: Categorize actionable and non-actionable alerts

What Does Healthy Mean Anyway?

In this setup it becomes quickly apparent that finding the right thresholds for triggering alerts is non trivial. Due to inconsistent inbound data volumes, it is easy to set thresholds that are too aggressive, which leads to a flood of meaningless noise drowning out critical failures. The opposite is to set thresholds that are too casual, and you’ll never find out about real problems. So, instead of using your gut instinct for setting thresholds, mine data like log files and metrics to figure out what historically would be appropriate thresholds.

TIP: Mine metrics for proper thresholds

Self Healing

As it turns out it happens that you get periodically the same alert for the same reason and the fix is to manually step through some actions that usually fix the problem. At that point, the obvious question is why not to automate the fix and trigger it directly when the problem is detected by the health check. The tendency can be for engineers to postpone the automation effort, partially because it is not interesting feature work. Additionally, it can be tremendously tedious to automate the manual steps, since they frequently involve tools that application developers are sometimes not exposed to. (cron, bash, etc.)

Sometimes you might have to spend a couple of hours automating something that takes  one minute to do manually. The importance of reducing context switches from feature work can not be stressed enough and you the investment will always pay off.

TIP: Application developers learn a new tool every week!

Conclusion

One can categorize issues in a production system by assessing whether an issue raised an alert, whether the issues requires manual intervention, and whether the issue is an actual problem or just a reporting flaw. The following venn diagram illustrates a situation that leads to unhappy customers and tired engineers:

So, for a smoothly running engineering organization you clearly want to focus your energy on getting to a better state.

TIP: Get to this state: