Enriching With Redis Part II: Schema Happens

As a follow up to our first post about Redis at Gnip, we’ll discuss how we built on our basic client side Redis usage.

Client Side Woes

Early on our Redis usage implementation was great. It was performant, we had failover, replication, and health checks. However a lot of our client code for using Redis had become pretty low level. For instance code to read or write an enrichment had to worry about, and implement things like: Which key is this going to live in? Hashed? Set? How do I serialize/deserialize this value? What happens if it fails, or I need to retry? Does this operation need to be pipelined? What’s the domain logic around the data?

Operational Woes

Another issue was handling iterations and evolutions to how we store the data, where we store, and how we relate it to other things. Simply clearing the cache wasn’t an option since that would be unreasonable downtime in our real-time pipeline of enrichments. Instead we versioned access to our Redis instance by explicitly branching around a config option. This allowed us to, say, write into both the “old” and “new” cache, and then switch reading over to “new” once it was fully populated. However all the code to deal with reading, writing and versioning quickly overwhelmed the domain intent of what we were doing, and made it difficult to reason about correct behavior.

Realization

What we noticed was that we implicitly had a schema defined in between all the serialization, ordering, and what-goes-where logic. Redis itself is schema-less (just commands against keys), but more often than not your application specific usage of Redis does have a schema: a relationship between keys, the values they hold, and how your application interprets them.

Schema

We decided to formalize our schemas in source. You could define a set of “keys”, and then perform operations against those keys. For instance this is how we logically store data into our profile location cache: “I want to be able to quickly look up a given Geo Entity by it’s Id. I’d also like to compact them into a set of hashes, for reasonable memory performance by dividing the Id by 1000 to get the top level hash, and using the remainder as the field inside that hash. The Geo Entities I’ll be storing are plain Google Protocol Buffers. Furthermore I want to track both the total count of geo entities, and each component hash key of the overall bucket of Geo Entities”. Woo! what a mouthful! Imagine all the mechanical drudgery needed to remember all those details. In our schema definition this is what it looks like:

geoEntityBucketDefinition = new ComputedBucketDefinition<Integer, Enrichments.GeoEntity>()
                .prefix("gnip:GtGe")
                .keyStrategy(new DivideIntegerBy(1000))
                .transformer(new GpbTransformer<Integer, Enrichments.GeoEntity>(
                        false,
                        new GpbTransformerBuilder<Enrichments.GeoEntity>() {
                            @Override
                            public Enrichments.GeoEntity from(byte[] bytes) 
                              throws InvalidProtocolBufferException {
                                return Enrichments
                                  .GeoEntity
                                  .newBuilder()
                                  .mergeFrom(bytes).build();
                            }
                        }))
                .trackComponentKeys("gnip:MetaBuckets")
                .trackCounts("gnip:NumOfGeoEntities");

Pipeline vs Direct Access

One of the biggest performance wins when using Redis is utilizing the pipelining feature. Baking that into our Schema implementation was a requirement from day one. Before we can see how the schema helps with this, a detour into Jedis usage is required.

Pipelining in Jedis

Pipelining in Redis is simply sending multiple commands in one request, and then reading the responses back. But what does this look like in a client library? Let’s say we’re working with a key called foo. With Jedis we can simply say:

Jedis j = new Jedis();
String value = j.get(“foo”);

Ok, that’s pretty great. But what if foo were one of N keys that we wanted to pipeline? Well we’d do something like the following:

Jedis j = new Jedis();
Pipeline p = j.pipelined();

Response<String> valueResponse = p.get(“foo”);

p.sync();

String value = valueResponse.get();

Here you can see that Pipeline.get(String) returns a String wrapped in a Response type. Response is simply a Promise or Future; it’s a handle to the result of the command that hasn’t run yet. When we invoke p.sync() we’re telling redis “hey, we’re not sending any more commands, please process what I sent you”. Jedis will then parse that response, and fill in the values underneath the Response objects, so that Response<>.get() actually gives you data.

Pipelining with Schema

While the concept behind pipelines in Jedis is pretty straightforward, managing all the state of dealing with it can easily overcrowd the intent of a piece of code with a bunch of mechanics. Instead the Schema has pipelining baked in, allowing you to access the same keys in the same way, without having to worry about tracking individual Response objects, and when it’s ok to call get() on them. How? Every KeyDefinition type has the ability to build a pipelined version, or a direct version of itself. This also enables us to perform any actions against the schema still in the context of our retry aware, and failover enabled, connection pool. Let’s say this is our schema definition:

public class MySchema
  extends RedisSchema<MySchema.PipelineConnection, MySchema.DirectConnection> {
    private final ComputedBucketKeyDefinition geoEntityBucketDefinition;

    public MySchema() {
        geoEntityBucketDefinition = // Definition from above
    }

    @Override
    protected PipelineConnection buildPipeline(Pipeline p) {
        return new PipelineConnection(
            geoEntityBucketDefinition.buildPipeline(p)
        );
    }

    @Override
    protected DirectConnection buildDirect(Jedis j) {
        return new DirectConnection(
            geoEntityBucketDefinition.buildDirect(j)
        );
    }

    public static class PipelineConnection {
        private final PipelineBucketKey geoEntityBucketKey;

        PipelineConnection(PipelineBucketKey geoEntityBucketKey) {
            geoEntityBucketKey = geoEntityBucketKey;
        }

        public PipelineBucketKey geoEntityBucket() {
            return geoEntityBucketKey;
        }
    }

    // Same pattern for DirectConnection
}

Here we provide a way to build a pipeline or direct connection from our schema, by just asking our definition to build the right version of it’s key, and then exposing that for use.

Usage

Enrichments.GeoEntity geoEntity = schema.pipelineResult(
  new SchemaConnectionResultAction
  <ProfileLocationRedisSchemaV2.PipelineConnection, Deferred>() {
    @Override
    public Deferred execute(
    ProfileLocationRedisSchemaV2.PipelineConnection connection) {
      return connection.geoEntityBucket().get(1);
   }
 }
);
Map<Integer, Enrichments.GeoEntity> geoEntitiesById = 
  schema.pipelineMapResult(
  new SchemaConnectionResultAction
    <ProfileLocationRedisSchemaV2.PipelineConnection,
      Map<Integer,
        Deferred
        <Enrichments.GeoEntity>>>() {
        @Override
        public Map<Integer, Deferred<Enrichments.GeoEntity>>
          execute(
            ProfileLocationRedisSchemaV2.PipelineConnection connection) {
            return connection.geoEntityBucket().get(geoIds);
        }
    });

Schema Versioning

A topic we touched on last time was the idea of “versioning” our schema. The idea here is to allow us to evolve the keys we use, while minimizing downtime, and confusion. Our solution is more to apply a pattern to how we use our Schema defintions. First we append a version number to our schema definition:

public class MySchemaV0 { // etc }

Then we define an interface for what we want to do against the abstract notion of our “cache”:

public interface GeoEntityService {
    void put(GeoEntity geoEntity);
    GeoEntity get(Integer id);
}

And finally we glue our specific schema version to the interface:

public class GeoEntityServiceV0 implements GeoEntityService {
    public GeoEntityService(MySchemaV0 schema) { }
    // impls here
}

We can then control what “version” we bind to the abstract interface with a configuration option. For instance, we may stand up a second writer with V<N+1> while all the readers use V. Once <N+1> is populated we can flip all the readers over to it just by changing their config.

Enriching With Redis: Part 1

We’ve talked previously about how we use Redis to Track Cluster Health for our Historical PowerTrack product.

Now we’ll be exploring how we enrich with Redis.

Adding the Bling

For every activity that flows through Gnip, we add extra metadata that provides additional information that our customers are interested in. We call this extra metadata Enrichments, and it’s what allows customers to filter on URLs or Profile Location.

As you can imagine enriching every activity in realtime requires a lot of work to make sure that we don’t add significant delay to the realtime stream. When you think of an enrichment such as Profile Location, it’s hard to imagine doing all the work to match the user bio location for every activity as we see them.

Instead, we depend on “pre-computing” as much as we can, caching that pre-chewed data, and then in realtime quickly fetching and enriching a particular activity.

We still need something low-latency, and fast to serve as our cache, and Redis fits that bill pretty nicely.

Profile Geo

For this post, we’ll use our Profile Geo enrichment as an example for all of our work. If you’re not familiar with this enrichment you can read more about it here, but for a quick overview, this enrichment takes the opaque, user-defined profile location string from each activity, and attempts to find a matching, structured geo entity. This adds metadata like name, timezone, long/lat center point, and hierarchical info (i.e. Boulder, is part of Colorado). Our PowerTrack customers are then able to filter on this new data.

Infrastructure

Non-realtime processes are responsible for crunching, and storing the relevant enrichment data into the Redis cache. This data is stored in Google protocol buffers for quick fetching, parsing and reducing memory pressure in Redis.

locaterator_diagram

For Profile Geo, we have an app that sniffs for geotagged activities. When it receives one, it then attempts to geo locate the location string in that user’s profile. If the distance between the geo entity that we find based on the profile location is close to the longitude, latitude in the activity then we store the mapping between that profile location string and the given geo entity. If that mapping already existed, it increases a “score” for that mapping. Later when our worker process goes to perform the actual enrichment, it pulls the full set of mappings for that profile location string, and chooses the best one based on score and other signals.

Redis really shines in this type of low-latency, batched, workload. The main Redis instance in the Profile Location cache performs about 20k operations per second, over a working set of 2.8G.

Redis meet Gnip, Gnip meet Redis

Each Redis host also has a Gnip specific app, that lets us tie in the Redis host to our own health checks, and configuration rollout. The app does a few extra things like periodically uploading RDBs to S3 for insurance against losing an entire machine.

Jedis

We make use of the excellent Jedis library to access all the data that we store in our various Redis caches.

Failover

We built in an idea of “failover”. All of our Redis instances participate in master-slave replication. Our “failover” is a simple “given a list of priority sorted Redis hosts, try to use the highest priority host always”.

For example, if the master goes down then every client will fail over to querying the slave (but the app that is responsible for populating the cache is never configured to see the slave, so will just stop writing), and then when the master comes back online, everyone can start querying it again.

This lets us address operational things such as network hiccups, or node failures, but it also lets us do things like upgrade Redis live without having to stop delivering enrichments.

This is a trade off between availability and correctness. We’re ok with being a little bit stale if it means we can still serve a reasonable answer instead of full-on unavailability of the entire enrichment for everyone.

Optimizations

Pipelining

We are also able to get such good performance by using a feature called pipelining. Redis is a request / response based protocol. Send a command, wait, read the response. While this is simple, a network roundtrip per command is not going to give you good throughput when you need to keep with fast-moving streams (like the Twitter firehose). But Redis also ships with a time-honored feature, whereby you send multiple commands at once, and then wait for the responses for all of those commands. This feature allows us to bulk write to and read from our caches.

Hashing

When we first started proofing out a cache with Redis, we used it in a pretty straightforward manner. Each individual geo entity was given its own key (i.e. `geo_entity:`), with the value being the GPB representing all the interesting things about the Entity (name, long/lat, etc).

For example, if we had a geo entity with an id of ’1235594′, we’d get this as the key:

`geo_entities:1235594`. The value for that key would be the binary protobuf value.

However, this naive approach is wasteful with memory. With the knowledge that Redis stores hashes efficiently and some inspiration from this blog post, we tweaked our data model to get better memory performance.

We decided to take each geo entity Id, divide it by 1000, and use the quotient to build the key. We treated that key like a hash, and use the “remainder” (Id – (quotient * 1000)) as the field into that hash. By creating many hashes we give Redis a chance to do more efficient encoding of the fields and values within each hash, which dramatically reduces our memory usage. (You can read more about Redis memory optimization here.)

Again working with a geo entity with an id of ’1235594′:

The original plan is simple; the key is: `geo_entities:1235594`, and we can do a simple SET:

SET ‘geo_entities:1235594’

Using hashes, things are a little bit more complicated:

key_id = 1235594 / 1000 = 1235 # integer division
field_id = 1235594 - (1235 * 1000) = 594

Given that we just do a HSET:

HSET ‘geo_entities:1235’ ‘594’

Another example with key id 1235777:

HSET ‘geo_entities:1235 ‘777’

However, the payoff is worth it. At time of posting, we hold about 3.5 million individual (normalized) user-generated profile location mappings to a candidate set of geo entities. Each member of that set has associated metadata that we use to improve and refine the system over time.

All told that consumes about 3G of memory in Redis. On average that is around 100 bytes per unique user profile location. That is a pretty great use of space!

Same Time, Same Place

Next time, we’ll discuss another abstraction we’ve built over Jedis to ease interacting with Redis from our client applications.

Bend, Don’t Break

This post explains how we solved a major problem we faced: the challenge of scaling our realtime data processing systems to handle ever-increasing volumes without drastically over-building. By using Kafka as a queuing system in the initial ingest, we’ve been able to build a system that elegantly scales to handle unpredictable load and that lets us “go wide” when we need to process higher than normal data volumes.

Drinking From The Firehose

At Gnip, we are continuously ingesting realtime firehoses of activities from multiple social media publishers (Twitter, Tumblr, WordPress, just to name a few). Due to breaking news stories or major events, the number of activities per second that we consume can spike up significantly. We have seen, for instance, upwards of sustained 20,000 Tweets per second coming in from Twitter during the Super Bowl or on election night – more than twice the average of 6-8,000 Tweets per second we see these days.

So far, we have always scaled each of our components to handle the highest imaginable load. This presents a challenge though, since the “highest imaginable load” increases continuously as use of social media increases. Three years ago, on average, we saw 1,000 Tweets per second. Now we are up to more than 6,000 Tweets per second every day, and peak volumes have increased in a similar fashion. Rather than have to continuously adjust and plan ahead for what we think the “highest imaginable” load might be, we’ve sought an approach that would let us handle whatever volumes might come in the front door.

Bend, Don’t Break

Therefore, we have spent a lot of time and effort thinking about what we do when we get a spike that is higher than the highest imaginable load – say 50,000 Tweets a second. Who knows, perhaps the 2chan community will flash mob twitter again, and we wouldn’t want to miss that. We want to make sure that we handle this event gracefully, which for us means concretely to never drop an activity or miss an enrichment. (When we add metadata to an activity – like a Klout score or an unwound URL – we call that an ‘enrichment’).

So, inspired by this blogpost, we decided that we will provision our inbound (or edge) infrastructure that consumes the firehoses to handle anything the publishers could ever throw at us, but have the internal components be more flexible. This allows us to guarantee an even higher level of reliability without spending exponentially more resources just to cover rare peak events.

Borrowing some football terminology we call this the “Bend, but don’t break” strategy.

Kafka The critical piece of open source software that allows us to do this easily is Kafka, since consumers can read from the Kafka topics at their own pace. Many other queuing systems are not as tolerant to slow consumers. They can implement back pressure and nothing is lost, as long as they don’t fall behind the window of data stored in Kafka queues. (The retention period is solely limited by how much you’d like to spend on drives.)

The consequence to downstream components is that the spikes are somewhat smoothed out. The following picture from a load test compares the volume we are getting from a publisher on the inbound edge of our infrastructure compared with the inbound volume of an internal component during a load test with a sustained volume of 25k Tweets/second.

As you can see, the internal component handled 20k tweets/second quite happily, but started applying back pressure at 25k tweets/sec and therefore took a couple of minutes to drain the backlog until it has completely caught up with real-time. This load test helped us identify the need to further scale that component to handle a possible 25k tweets/sec in realtime, so we increased capacity accordingly. But even through the spike was larger than expected, we did not experience any data loss or operational issues. Full data fidelity at all times is a major benefit of this approach and a requirement of our products.

Implementation

In order to make this work, we ensure that the apps never pull more work from Kafka than they can handle. For instance, in one step of our pipeline we extract information required for calculating enrichments from activities, e.g. Tweet text in order to run language classification, and then distribute that work to multiple worker hosts in order to parallelize the computation. The workers pull work from queues on a distributor app as fast as they can complete work.

Since we build our backend system in Java we leverage the functionality from java.util.concurrent quite extensively. As queues we use for this purpose LinkedBlockingQueue, so that if the workers can’t keep up, the threads on the distributor block until the queues get drained.  It is just as important to size the blocking queues properly when they are created:

LinkedBlockingQueue<EnrichmentRequest> requestQueue =
       new LinkedBlockingQueue<EnrichmentRequest>(MAX_QUEUE_SIZE);

Since reading from Kafka happens on the same thread, we naturally apply back pressure. MAX_QUEUE_SIZE depends mostly on how much memory you have available and how quickly you would want to apply back pressure.

One additional bit of work the distributor app has to do, is to park activities for a couple of seconds in order to give time to the workers to calculate the enrichments. Again java.util.concurrent comes in handy for building a BlockingDelayQueue:

public class BlockingDelayQueue {
    DelayQueue delayQueue = new DelayQueue();
    Semaphore capacitySemaphore;

    public BlockingDelayQueue(int capacity) {
        capacitySemaphore = new Semaphore(capacity);
    }

    public void put(T item) throws InterruptedException {
        capacitySemaphore.acquire();
        delayQueue.put(item);
    }

    public T take() throws InterruptedException {
        T item = delayQueue.take();
        capacitySemaphore.release();
        return item;
    }

    public int size() {
        return delayQueue.size();
    }
}

 

This class is built to support back pressure. In case the queue is not getting drained, the application backs off automatically.

Scalability

Apart from the persistent queuing feature from Kafka, the system also gains natural horizontal scalability out of the box, since internal components can be parallelized by consuming only a subset of partitions. In Kafka each topic is spread out across a configurable number of partitions in order to support horizontal scalability. Stay tuned to the blog for a deeper dive into what it takes to pump firehoses through Kafka and not miss a drop.

References

  1. http://kafka.apache.org

  2. http://mechanical-sympathy.blogspot.co.uk/2012/05/apply-back-pressure-when-overloaded.html

Building a Killer (Twitter) Search UI

Gnip is primarily an API company, providing its customers with API access to the social data activities they request and putting the onus of processing, analysis and display of the data on said customer.

In the case of our Search API product, many of our customers want to expose the functionality and control directly to their paying users, in a way that is similar to traditional search engines (Google, Twitter Search, etc). This type of integration is different from our HTTP Stream-based products, which require more behind-the-scenes management by our customers. So to demonstrate to our customers and prospects the type of applications they could build into their product suites using the Search API, we built Twitter Search on Rails.

I want to explain how I developed this bootstrap project for your use and why you would want to use it. The code may be Rails and CoffeeScript, but the concepts stand on their own. First, a demo:

Twitter Search on Rails Demo

Multiple Visualizations

There are many ways to slice and dice interesting tweets. Three ways our customers frequently do so include: trend analysis, a dashboard view of actual Tweet content, and geographic distribution.

Frequency over time

One request to Gnip’s Search Count API gives a nice history of frequency in the form of a line chart like this one.

Frequency of Black Friday tweets over time

You can see how I configured Highcharts to make the above chart in chart.coffee.

Geographic distribution

You can extract a great geographic distribution using the Profile Geo Enrichment mapped with MapBox:

Geo distribution of Black Friday tweets

Toss in a Marker Clusterer (for grouping geographically co-located points) and you’ll add another unique perspective to viewing the data.

Tweets that look like tweets

Visualizations are nice, but content is king™. We convert Activity Streams JSON into a tweet that conforms to Twitter’s Developer Display Guidelines using an yet-to-be-announced project. That means that entities like usernames and hashtags are linked; Retweet, Reply and Favorite work through Twitter Web Intents; and tweets support RTL display among other things.

When it comes to search, the query results have to stand out and behave the way a user would expect them to.

Decoupled Web Components

One can break the typical search app into fairly obvious pieces like the search form, the results, data retrieval etc. These pieces can operate independently of one another through the use of JavaScript events. This Pub/Sub model is perfect for decoupling parts of web apps.

Could not embed GitHub Gist 7989857: Bad credentials

Not only that, but if you also inject a DocumentFragment or some other container to be rendered to, you can unit/functional test this part of your webapp independently! Check out the less trivial implementation in activityList.coffee

Fast, Elegant Transitions

I think everyone would agree that they want a fast search experience. That doesn’t mean that it can’t be pretty, we just have to avoid animations that are known to be slow. The HTML5Rocks article on High Performance Animations explains what is performant and why much better than I could. TL;DR — Use CSS transitions, not JS; only transform opacity, rotate, translate and scale.

Hover over me

WABAM!

 

With that in mind, I found inspiration from Hakim El Hattab’s stroll.js when coming up with a simple but slick way to load search results.

Could not embed GitHub Gist 7990148: Bad credentials

About Search API on Rails

I originally wrote an application intended to show customers what they could build on top of Gnip’s Search API. It was such a hit that we had many requests from customers to license the code for building their own applications — so we decided to open-source it to facilitate integrations with the Search API.

We chose Rails here due to its popularity and, hell, we already know Rails. CoffeeScript’s classes made it easier to reason about web components that had an inheritance chain and because I like its syntax. Sass is our CSS preprocessor of choice here at Gnip.

This project isn’t just some proof-of-concept. It was written deliberately and with care and here are some features that prove it:

We hope you’ll find this background helpful when building your application on top of the Search API with Twitter Search on Rails! If you find an improvement, we welcome your contribution through GitHub. Enjoy!

Exploring S3 Read Performance

This blog post is a collaboration between Jud Valeski and Nick Matheson.

When you use Peta as the unit to measure your S3 storage usage, you’re dealing with a lot of data. Gnip has been a tremendously heavy S3 user since S3 came online; it remains our large-scale durable storage solution.

Over the years, we’ve worked with a wide array of data access models to accomplish a variety of things. From backfilling a customer’s data outage from our archives, to creating low-latency search products, how and where we access data is an integral part of our thinking.

If you’ve ever worked with really large data sets you know that the main challenge around them is moving them from application A to application B. Sneakernet is often the best way as it turns out.

Some of our datasets on S3 are large enough that moving them around is so impractical that we’ve had to explore various data access patterns and approaches that use S3 as big blob backend storage.

One such approach is using HTTP byte range requests into our S3 buckets in order to access just the blocks of data we’re interested in. This allows one to separate the data request from the actual “file” (which may be inconveniently large to transfer across the wire in low-latency access scenarios). It does require an index into said S3 buckets however; that needs to be calculated ahead of time.

Gnip recently experimented with this concept. Here are the findings. We’ve run into a few folks over the years who considered using S3 in this manner and we wanted to share our knowledge in hopes of furthering exploration of S3 as a blob storage device.

The Setup

Our tests explored a few combinations of node (ec2 instance) count, thread count and byte range size. The node count was varied from 1 to 7, the thread count from 1 to 400 and the byte range size between 1KB and 10KB. The range lookups were performed randomly over a single bucket with roughly half a million keys and 100TB of data.

The Results

The investigation started with a focus on scalability of a single node. The throughput of a single thread is not very compelling at ~8 lookups per second, but these lookups scale near linearly with increased thread count, even out to 400 threads on our c1.xlarge instance. At 400 threads making 10KB lookups network saturation becomes an issue as sustained 40+ MB/s is high for a c1.xlarge in our experience.

The next phase of the investigation focused on scaling out by adding more nodes. For these tests there was a significant difference between the scalability of 1KB vs 10KB lookups. With 10KB lookups even scaling beyond 2 nodes yielded rapidly diminishing returns. In the context of this test, we were unable to find a similar limit for 1KB lookups although there was a slight slope change for the last data point (7 nodes). It should be noted that these are limits for our specific bucket/key/partition structure and access pattern and thus may not be representative of true S3 limits or other usage scenarios.

In addition to the scalability tests, we also tracked the request latencies with a focus on the 90th, 95th and 99th percentile times. Under light and moderate load, the 99th percentile times were both in the 450-500ms range. Under moderate and heavy load these declined significantly to 1s and 6s respectively. The frequency of read/connect timeouts under moderate/heavy load significantly increased as well. There was significant variance in the percentile results both over time of day/week as well as minute to minute.

The Bottlenecks

Overall, this approach scaled better than we had initially anticipated but it did take some work on the client side to be as lean as possible. Things like keep-alive/connection pool tuning and releasing network resources as soon as possible (gotta read from/close those input streams) were all necessary to get to this level of throughput.

These tests started to bump into both per node as well as aggregate bandwidth limits. On a per node basis this is unlikely to be of practical concern since at this level of throughput cpu was at 50% utilization and any ‘real application’ logic would likely become the actual bottleneck effectively easing off the network. We will continue to investigate the aggregate performance levels as we suspect this has more to do with our particular data distribution, usage and S3 partitioning for which we have done limited tuning/investigation.

These tests should also be taken with a grain of salt as they likely represent a sunny day scenario that will likely not hold under the variety of conditions that we see running production systems at Amazon.

 

Custom Event Batching with Apache Kafka

Kafka provides a fair amount of configuration and pluggability out of the box. With configuration settings, you can control many features on the producer, broker, and consumer. Beyond simple settings, further customization can be found in several pluggable interfaces.

Looking at the producer side in particular, control over things like synchronous vs. asynchronous behavior, flush times, compression, etc. are easily controlled with configuration settings. Taking the next step, several simple interfaces provide customizability around how generic types are converted to Kafka messages before sending to the broker. A callback interface (kafka.producer.async.CallbackHandler) also allows for recording or customizing data as it goes through the various stages of the AsyncProducer.

If you need still more control and are willing to take more responsibility, the kafka.producer.async.IEventHandler interface provides the hook-point. Implementing this interface is a bit lower level as you’re dealing with batching and sending to the broker yourself. Let’s take a look at an example to see what’s involved.

The standard AsyncProducer does a good job of batching the messages that have been queued up by topic and broker, but the result is a single kafka.message.Message for each item you’ve queued to the producer. Suppose instead you want to batch all messages for a particular topic/broker into a single Message. In this example we’ll simply concatenate the original string messages into a comma-delimited string that goes to the broker.

The interface to implement, assuming you’re doing this in Java, is kafka.javaapi.producer.async.EventHandler<T>. The primary method of interest is:

public void handle(List<QueueItem<String>> items, SyncProducer producer, Encoder<String> encoder)

This method is called for each batch and is responsible for getting the QueueItems sent to the broker. We see that the underlying SyncProducer is passed in so that should be easy enough. Note that the String type parameter for QueueItem will vary based on the type your application is posting to the producer.

Here is what an implementation of that might look like:

Could not embed GitHub Gist f997df36b8020fba4244: Bad credentials

Simple enough, but the interesting behavior is in the helper methods. First off, we have a simple value class Destination that helps us map topics to partitions on the broker:

Could not embed GitHub Gist 3bd494ce0b57f65f8db1: Bad credentials

Next is our helper that groups each of the QueueItems by the destination to which it is being sent:

Could not embed GitHub Gist 0f5113556dbd6451b420: Bad credentials

Again, nothing too surprising here. Now that we have them grouped by destination, this is where we’ll deviate from the standard behavior. The createRequests method is responsible for creating the actual requests that go to the broker (through the SyncProducer).

Could not embed GitHub Gist 9a9a653d2f9070668966: Bad credentials

Notice the granularity of the original messages as they relate to a producer request and Kafka message. Multiple QueueItems are put into a single Kafka message as seen in the getKafkaMessage() method that concatenates all of the original messages for a destination into a single comma-delimited string:

Could not embed GitHub Gist b8afa89b5e44b7162d13: Bad credentials

While this method returns a List<Message> this is merely a convenience for the caller–it will only ever add the single message. Also, note that we get the Encoder and the ProducerConfig in the callback which allows us to honor those settings.

Finally, the EventHandler is passed to the Producer when it’s constructed either through a direct constructor call or by setting the “event.handler” producer property.

 As is the case with examples, this one does not implement all functionality and does little error-checking in the interest of clarity. The standard behavior has additional functionality such as compression on a per-topic basis, so you’ll want to pay attention to that if you’re taking advantage of those features.

 

 

The Quick Left Rube Goldberg Hackfest

Rube Goldberg Hackfest

Do you like to geek out? like to code? to tinker? to hack? Ever wondered what it’s like to wrangle the Twitter firehose? Like to build overly complicated and interesting machines that perform a simple task? Of course you do! The Quick Left Rube Goldberg Hackfest on March 27th is your chance to dabble in a little of all of that!

We are very excited to be opening up some real, live, Twitter (and possibly other publishers) streams for participants to develop against. Our product streams are consumed daily by social media monitoring firms, Fortune 500 companies and the like to perform important business analysis. But who cares about that? This will be your opportunity to dream up a ridiculous use  for them in your own recipe to hack together a goofy, complicated and most importantly, fun, machine.

Gnip will be co-sponsoring the hackfest along side ATOMS Express, Mobiplug, and KST.  Whether you want to dive in headfirst and play with all of these toys to your heart’s content (well, just 3 hours, and please don’t break them) or you just want to come hang out and watch the madness ensue, all are welcome. There will be beer and certainly some snackage. Participants, make sure to register for the event beforehand! See you there!

 

The Eclectic Art Of System Monitoring

The Problem

At Gnip, we deploy our software to hundreds of servers across multiple data centers and heterogeneous environments. Just architecting the software itself and how to create a streamlined deployment process on this scale is in itself worth a full blog post. In this post though I would like to focus on the aspect of how to make sure that your full application stack is healthy all the time.

The Solution?

On the mechanical side the solution is very straight forward: You deploy a system monitoring tool that watches continuously all servers and all applications that will send you friendly messages when something does not look right. There are plenty of tools available for this purpose. We ourselves use Nagios, which has become to some extend an industry standard.

That was easy, right?

Sadly, the first sentence in this paragraph leaves a lot of details out, and these details can kill your application stack and your engineering team at the same time. (Either by not finding or problems, or by keeping your engineers up all night.) In this post, I’d like to share how we at Gnip go about keeping software, hardware, and people-ware happy.

Watching All Applications

It is certainly not enough to just ensure that an application is running. In order to get meaningful insight, you have to add health checks that are particular to the application itself. For instance, in an application that consumes a continuous stream of data, you can be fairly certain that there is a problem if the transaction volume drops below a certain rate. Nagios allows quite easy integration with these kind of application specific health checks. We ourselves decided to expose an HTTP endpoint on our applications that will expose a digest of all health checks registered for an application. This allows the Nagios check to be uniform across all applications.

TIP: Standardize health APIs

Sending Friendly Messages

We configured our system so that certain types of alerts in our production system will cause text messages to be sent to the engineer who is on call. All engineers participate in this on call rotation. This process ensures a large sense of ownership, because it does not take many pages in the middle of the night to change an engineers’ attitude towards writing reliable and scalable code. (If it doesn’t bother the engineer herself, the grumpy spouse will have a strong opinion, too.)

TIP: Every engineer participates in on-call rotation

We also have a set of health checks that will send out emails instead of pages. We use this for situations where it appears that no immediate action can or has to be taken.

TIP: Categorize actionable and non-actionable alerts

What Does Healthy Mean Anyway?

In this setup it becomes quickly apparent that finding the right thresholds for triggering alerts is non trivial. Due to inconsistent inbound data volumes, it is easy to set thresholds that are too aggressive, which leads to a flood of meaningless noise drowning out critical failures. The opposite is to set thresholds that are too casual, and you’ll never find out about real problems. So, instead of using your gut instinct for setting thresholds, mine data like log files and metrics to figure out what historically would be appropriate thresholds.

TIP: Mine metrics for proper thresholds

Self Healing

As it turns out it happens that you get periodically the same alert for the same reason and the fix is to manually step through some actions that usually fix the problem. At that point, the obvious question is why not to automate the fix and trigger it directly when the problem is detected by the health check. The tendency can be for engineers to postpone the automation effort, partially because it is not interesting feature work. Additionally, it can be tremendously tedious to automate the manual steps, since they frequently involve tools that application developers are sometimes not exposed to. (cron, bash, etc.)

Sometimes you might have to spend a couple of hours automating something that takes  one minute to do manually. The importance of reducing context switches from feature work can not be stressed enough and you the investment will always pay off.

TIP: Application developers learn a new tool every week!

Conclusion

One can categorize issues in a production system by assessing whether an issue raised an alert, whether the issues requires manual intervention, and whether the issue is an actual problem or just a reporting flaw. The following venn diagram illustrates a situation that leads to unhappy customers and tired engineers:

So, for a smoothly running engineering organization you clearly want to focus your energy on getting to a better state.

TIP: Get to this state:

Chef, EC2 Auto Scaling, and Spot Instances for Fun and Profit

If you have a problem with dynamic CPU and or I/O requirements that is easily distributed across homogenous nodes, EC2 Auto Scaling can be a great tool to add to your stack.  If you can design your application in a way that can gracefully handle nodes terminating while processing work, Amazon EC2 Spot Instances can provide a great cost savings.  We’ve used these technologies in conjunction with Resque with great success.  This model would likely fit other job management frameworks such as Sidekiq or Gearman.

If you’re in your own datacenter you can achieve similar functionality with Eucalyptus or OpenStack/Heat, but we’ll focus on using EC2 Auto Scaling as a concrete example for this blog post.  By the end of this post, you should have a good idea of how to set up your own EC2 Auto Scaling cluster using Chef and Spot instances.

Create a Package That Installs and Runs Chef
You’ll want to create a package (rpm, deb, etc..) that contains definitions of all the software necessary to start “doing work” on your newly provisioned worker node.  We managed to achieve this by creating an RPM that includes our chef cookbooks using the process outlined in this previous blog post.  In the next step we’ll create an AMI that will install these cookbooks via yum when our node boots up and then run chef to turn our base server into a fully functional worker node.

Create Base AMI
We subscribe to the principle that you should have a small number of Amazon Machine Images (AMIs) that have only the minimal amount of information to bootstrap themselves.  This information is typically just:

1.  Which environment am I currently in? (review, staging, or prod)
2.  What type of node am I?  (worker node, web server, etc..)
3.  Where can I download the necessary cookbooks to provision myself? (configure yum or deb sources)

Following this principle, we have created an AMI that runs a “firstboot.sh” init.d script.  This first boot script will configure the node to look at the appropriate yum repo and install the RPM we created in the previous step.  This way the AMI can remain relatively static and you can iterate on your bootstrap code without having to follow the cumbersome process of creating a new AMI each time.  After the cookbooks have been pulled down to the local filesystem, our first boot script will run chef solo to install the necessary software to start “doing work”.

In order to create your own AMI, you’ll need to:

1.  Boot a new EC2 Server.
2.  Install Amazon EC2 Command Line Tools
3.  Customize your server to run your own “first boot” script that will install and run chef-solo.
4.  Run the following command to bundle your server to a compressed image in the /tmp directory:

ec2-bundle-vol -k /path/to/aws-priv-key.pem 
-c /path/to/aws-cert.pem -u <aws-account-id>

5.  Upload your bundled image to s3

ec2-upload-bundle -b <s3_path>-m /tmp/image.manifest.xml 
-a <access_key_id> -s <secret_key>

6.  Register your newly created AMI with your account

ec2-register <s3_path>/image.manifest.xml 
-K /path/to/aws-priv-key.pem -C /path/to/aws-cert.pem

Create Auto Scaling Group
Now that we have an AMI that will start performing work upon boot, we can leverage Amazon’s EC2 Auto Scaling to start going wide.  The idea is that you define a cluster of machines that use the AMI we just created and you define how many servers you want up at any given time.  If you want to spin up 50 servers, you simply set the “DesiredCapacity” for you group to 50 and within minutes you will have 50 fresh new worker nodes.  There are two discrete steps needed to make this happen.  Let’s illustrate how to do this with Fog:

Create Launch Config

as = Fog::AWS::AutoScaling.new(:aws_access_key_id => access_key_id, 
                               :aws_secret_access_key => access_secret_key)
as.create_launch_configuration(<ami_id>, 
                               <machine_type>, 
                               <launch_config_name>, 
                               "SecurityGroups" => <security_groups>, 
                               "KeyName" => <aws_key_pair_name>, 
                               "SpotPrice" => <spot_bid_price>)

This will create a launch configuration that we will use to define our Auto Scaling group.

Create Auto Scaling Group

as.create_auto_scaling_group(<auto_scaling_group_name>, 
                             <availability_zones>, 
                             <launch_config_name>, 
                             <max_size>, <min_size>, 
                             "DesiredCapacity" => <number_of_instances>)

This will create an Auto Scaling Group and will spin up <number_of_instances> servers using the AMI defined in our launch configuration above.

Note that one of the parameters we’ve passed to our Launch Configuration is “SpotPrice”.  This allows you to leverage Amazon’s Spot Instances.  The idea is that you will pay whatever the “market rate” for the given machine_type you’re provisioning.  If the market rate elevates above your SpotPrice, instances in your cluster will begin to terminate.  Your application should be tolerant of these failures.  If this is a mission critical application, you should likely create a “backup” Auto Scaling group without the SpotPrice parameter.  This means you will pay whatever the On-Demand price for your given machine_type, but will allow you to continue to process work when resources are sparse.

Grow / Shrink Auto Scaling Group

Depending on your application, you’ll likely want to grow and shrink your Auto Scaling group depending on how much work needs to be done.  This is as simple as the following API call:

as.set_desired_capacity(<auto_scaling_group_name>, 
                        <number_of_instances>)

The ability to easily spin up 500 worker nodes with one API call can be a very powerful thing, especially when dealing with the amount of data we deal with at Gnip.

Future improvements to this process include wiring this process into Chef Server for easier centralized config management across datacenters.  If these sort of challenges sound interesting, be sure to check out our job postings, we’d love to talk to you!

Understanding the Kafka Async Producer

We’ve been talking recently about our use of Apache Kafka–a system that takes a different view of implementing messaging. Because we’re using Kafka in a high-volume area of our system, we’ve had to spend time understanding how to get messages through reliably at high rates. While we’ve spent time on both the producing and consuming side of the brokers, this post focuses on the producing side only.

At a high level Kafka offers two types of producers–synchronous and asynchronous. As a user of the producer API, you’re always interacting with the kafka.producer.Producer class, but it’s doing a fair bit for you under-the-covers depending on how you’ve configured it.

By default the Producer uses SyncProducers underneath. SyncProducer does what it sounds like–it sends the messages to the broker synchronously on the thread that calls it. In most high volume environments this obviously isn’t going to suffice. It’s too many network sends on too many small-ish messages.

By setting the producer.type flag to ‘async’, the Producer class uses AsyncProducers under the covers. AsyncProducer offers the capability not only to do the sends on a separate thread, but to batch multiple messages together into a single send. Both characteristics are generally desirable–isolating network I/O from the threads doing computation and reducing the number of network messages into a smaller number of larger sends. Introductory information on the producers can be found here.

An important note is that Kafka supports compressing messages transparently such that neither your producer or consumer code has to do anything special. When configured for compression (via the compression.codec producer config setting) Kafka automatically compresses the messages before sending them to the broker. Importantly, this compression happens on the same thread that the send occurs.

Because the compress and send are both relatively time consuming operations, it’s an obvious place you may want to parallelize. But how parallel is the Producer?

A producer configured for async mode creates a separate underlying AsyncProducer instance for each broker via its internal ProducerPool class. Each AsyncProducer has its own associated background thread (named ProducerSendThread) that does the work for the send. In other words, there’s one thread per broker so your resulting parallelism for compressing and sending messages is based on the number of brokers in the cluster.

Here’s a snippet of the threads in an application with a single broker:

AsyncProducer

And here’s what they look like under the same producer load with 3 brokers in the cluster:

AsyncProducer

In the first image you can see that the single thread is doing more work than when it’s spread out, as you would expect.

A common question is “Why am I seeing QueueFullExceptions from the producer”. This often means the ProducerSendThreads are saturated and unable to keep up with the rate of messages going into the Producer. An obvious approach to deal with this is adding brokers to the cluster and thus spreading the message sends across more threads, as well as lightening the load on each broker.

Adding brokers to the cluster takes more consideration than just how efficient the producers are. Clearly there are other approaches you could take such as building a different async model on top of the SyncProducer that’s not dependent on the number of brokers. But it’s useful to know what the options are, and in the end adding brokers may be useful for your architecture in other ways.

If the topology of your producer applications is such that there are many applications producing a relatively small number of messages then none of this may be of concern. But if you have a relatively small number of producer applications sending many large messages, understanding how to spread out that work can be of real importance.