Enriching With Redis Part II: Schema Happens

As a follow up to our first post about Redis at Gnip, we’ll discuss how we built on our basic client side Redis usage.

Client Side Woes

Early on our Redis usage implementation was great. It was performant, we had failover, replication, and health checks. However a lot of our client code for using Redis had become pretty low level. For instance code to read or write an enrichment had to worry about, and implement things like: Which key is this going to live in? Hashed? Set? How do I serialize/deserialize this value? What happens if it fails, or I need to retry? Does this operation need to be pipelined? What’s the domain logic around the data?

Operational Woes

Another issue was handling iterations and evolutions to how we store the data, where we store, and how we relate it to other things. Simply clearing the cache wasn’t an option since that would be unreasonable downtime in our real-time pipeline of enrichments. Instead we versioned access to our Redis instance by explicitly branching around a config option. This allowed us to, say, write into both the “old” and “new” cache, and then switch reading over to “new” once it was fully populated. However all the code to deal with reading, writing and versioning quickly overwhelmed the domain intent of what we were doing, and made it difficult to reason about correct behavior.

Realization

What we noticed was that we implicitly had a schema defined in between all the serialization, ordering, and what-goes-where logic. Redis itself is schema-less (just commands against keys), but more often than not your application specific usage of Redis does have a schema: a relationship between keys, the values they hold, and how your application interprets them.

Schema

We decided to formalize our schemas in source. You could define a set of “keys”, and then perform operations against those keys. For instance this is how we logically store data into our profile location cache: “I want to be able to quickly look up a given Geo Entity by it’s Id. I’d also like to compact them into a set of hashes, for reasonable memory performance by dividing the Id by 1000 to get the top level hash, and using the remainder as the field inside that hash. The Geo Entities I’ll be storing are plain Google Protocol Buffers. Furthermore I want to track both the total count of geo entities, and each component hash key of the overall bucket of Geo Entities”. Woo! what a mouthful! Imagine all the mechanical drudgery needed to remember all those details. In our schema definition this is what it looks like:

geoEntityBucketDefinition = new ComputedBucketDefinition<Integer, Enrichments.GeoEntity>()
                .prefix("gnip:GtGe")
                .keyStrategy(new DivideIntegerBy(1000))
                .transformer(new GpbTransformer<Integer, Enrichments.GeoEntity>(
                        false,
                        new GpbTransformerBuilder<Enrichments.GeoEntity>() {
                            @Override
                            public Enrichments.GeoEntity from(byte[] bytes) 
                              throws InvalidProtocolBufferException {
                                return Enrichments
                                  .GeoEntity
                                  .newBuilder()
                                  .mergeFrom(bytes).build();
                            }
                        }))
                .trackComponentKeys("gnip:MetaBuckets")
                .trackCounts("gnip:NumOfGeoEntities");

Pipeline vs Direct Access

One of the biggest performance wins when using Redis is utilizing the pipelining feature. Baking that into our Schema implementation was a requirement from day one. Before we can see how the schema helps with this, a detour into Jedis usage is required.

Pipelining in Jedis

Pipelining in Redis is simply sending multiple commands in one request, and then reading the responses back. But what does this look like in a client library? Let’s say we’re working with a key called foo. With Jedis we can simply say:

Jedis j = new Jedis();
String value = j.get(“foo”);

Ok, that’s pretty great. But what if foo were one of N keys that we wanted to pipeline? Well we’d do something like the following:

Jedis j = new Jedis();
Pipeline p = j.pipelined();

Response<String> valueResponse = p.get(“foo”);

p.sync();

String value = valueResponse.get();

Here you can see that Pipeline.get(String) returns a String wrapped in a Response type. Response is simply a Promise or Future; it’s a handle to the result of the command that hasn’t run yet. When we invoke p.sync() we’re telling redis “hey, we’re not sending any more commands, please process what I sent you”. Jedis will then parse that response, and fill in the values underneath the Response objects, so that Response<>.get() actually gives you data.

Pipelining with Schema

While the concept behind pipelines in Jedis is pretty straightforward, managing all the state of dealing with it can easily overcrowd the intent of a piece of code with a bunch of mechanics. Instead the Schema has pipelining baked in, allowing you to access the same keys in the same way, without having to worry about tracking individual Response objects, and when it’s ok to call get() on them. How? Every KeyDefinition type has the ability to build a pipelined version, or a direct version of itself. This also enables us to perform any actions against the schema still in the context of our retry aware, and failover enabled, connection pool. Let’s say this is our schema definition:

public class MySchema
  extends RedisSchema<MySchema.PipelineConnection, MySchema.DirectConnection> {
    private final ComputedBucketKeyDefinition geoEntityBucketDefinition;

    public MySchema() {
        geoEntityBucketDefinition = // Definition from above
    }

    @Override
    protected PipelineConnection buildPipeline(Pipeline p) {
        return new PipelineConnection(
            geoEntityBucketDefinition.buildPipeline(p)
        );
    }

    @Override
    protected DirectConnection buildDirect(Jedis j) {
        return new DirectConnection(
            geoEntityBucketDefinition.buildDirect(j)
        );
    }

    public static class PipelineConnection {
        private final PipelineBucketKey geoEntityBucketKey;

        PipelineConnection(PipelineBucketKey geoEntityBucketKey) {
            geoEntityBucketKey = geoEntityBucketKey;
        }

        public PipelineBucketKey geoEntityBucket() {
            return geoEntityBucketKey;
        }
    }

    // Same pattern for DirectConnection
}

Here we provide a way to build a pipeline or direct connection from our schema, by just asking our definition to build the right version of it’s key, and then exposing that for use.

Usage

Enrichments.GeoEntity geoEntity = schema.pipelineResult(
  new SchemaConnectionResultAction
  <ProfileLocationRedisSchemaV2.PipelineConnection, Deferred>() {
    @Override
    public Deferred execute(
    ProfileLocationRedisSchemaV2.PipelineConnection connection) {
      return connection.geoEntityBucket().get(1);
   }
 }
);
Map<Integer, Enrichments.GeoEntity> geoEntitiesById = 
  schema.pipelineMapResult(
  new SchemaConnectionResultAction
    <ProfileLocationRedisSchemaV2.PipelineConnection,
      Map<Integer,
        Deferred
        <Enrichments.GeoEntity>>>() {
        @Override
        public Map<Integer, Deferred<Enrichments.GeoEntity>>
          execute(
            ProfileLocationRedisSchemaV2.PipelineConnection connection) {
            return connection.geoEntityBucket().get(geoIds);
        }
    });

Schema Versioning

A topic we touched on last time was the idea of “versioning” our schema. The idea here is to allow us to evolve the keys we use, while minimizing downtime, and confusion. Our solution is more to apply a pattern to how we use our Schema defintions. First we append a version number to our schema definition:

public class MySchemaV0 { // etc }

Then we define an interface for what we want to do against the abstract notion of our “cache”:

public interface GeoEntityService {
    void put(GeoEntity geoEntity);
    GeoEntity get(Integer id);
}

And finally we glue our specific schema version to the interface:

public class GeoEntityServiceV0 implements GeoEntityService {
    public GeoEntityService(MySchemaV0 schema) { }
    // impls here
}

We can then control what “version” we bind to the abstract interface with a configuration option. For instance, we may stand up a second writer with V<N+1> while all the readers use V. Once <N+1> is populated we can flip all the readers over to it just by changing their config.

Enriching With Redis: Part 1

We’ve talked previously about how we use Redis to Track Cluster Health for our Historical PowerTrack product.

Now we’ll be exploring how we enrich with Redis.

Adding the Bling

For every activity that flows through Gnip, we add extra metadata that provides additional information that our customers are interested in. We call this extra metadata Enrichments, and it’s what allows customers to filter on URLs or Profile Location.

As you can imagine enriching every activity in realtime requires a lot of work to make sure that we don’t add significant delay to the realtime stream. When you think of an enrichment such as Profile Location, it’s hard to imagine doing all the work to match the user bio location for every activity as we see them.

Instead, we depend on “pre-computing” as much as we can, caching that pre-chewed data, and then in realtime quickly fetching and enriching a particular activity.

We still need something low-latency, and fast to serve as our cache, and Redis fits that bill pretty nicely.

Profile Geo

For this post, we’ll use our Profile Geo enrichment as an example for all of our work. If you’re not familiar with this enrichment you can read more about it here, but for a quick overview, this enrichment takes the opaque, user-defined profile location string from each activity, and attempts to find a matching, structured geo entity. This adds metadata like name, timezone, long/lat center point, and hierarchical info (i.e. Boulder, is part of Colorado). Our PowerTrack customers are then able to filter on this new data.

Infrastructure

Non-realtime processes are responsible for crunching, and storing the relevant enrichment data into the Redis cache. This data is stored in Google protocol buffers for quick fetching, parsing and reducing memory pressure in Redis.

locaterator_diagram

For Profile Geo, we have an app that sniffs for geotagged activities. When it receives one, it then attempts to geo locate the location string in that user’s profile. If the distance between the geo entity that we find based on the profile location is close to the longitude, latitude in the activity then we store the mapping between that profile location string and the given geo entity. If that mapping already existed, it increases a “score” for that mapping. Later when our worker process goes to perform the actual enrichment, it pulls the full set of mappings for that profile location string, and chooses the best one based on score and other signals.

Redis really shines in this type of low-latency, batched, workload. The main Redis instance in the Profile Location cache performs about 20k operations per second, over a working set of 2.8G.

Redis meet Gnip, Gnip meet Redis

Each Redis host also has a Gnip specific app, that lets us tie in the Redis host to our own health checks, and configuration rollout. The app does a few extra things like periodically uploading RDBs to S3 for insurance against losing an entire machine.

Jedis

We make use of the excellent Jedis library to access all the data that we store in our various Redis caches.

Failover

We built in an idea of “failover”. All of our Redis instances participate in master-slave replication. Our “failover” is a simple “given a list of priority sorted Redis hosts, try to use the highest priority host always”.

For example, if the master goes down then every client will fail over to querying the slave (but the app that is responsible for populating the cache is never configured to see the slave, so will just stop writing), and then when the master comes back online, everyone can start querying it again.

This lets us address operational things such as network hiccups, or node failures, but it also lets us do things like upgrade Redis live without having to stop delivering enrichments.

This is a trade off between availability and correctness. We’re ok with being a little bit stale if it means we can still serve a reasonable answer instead of full-on unavailability of the entire enrichment for everyone.

Optimizations

Pipelining

We are also able to get such good performance by using a feature called pipelining. Redis is a request / response based protocol. Send a command, wait, read the response. While this is simple, a network roundtrip per command is not going to give you good throughput when you need to keep with fast-moving streams (like the Twitter firehose). But Redis also ships with a time-honored feature, whereby you send multiple commands at once, and then wait for the responses for all of those commands. This feature allows us to bulk write to and read from our caches.

Hashing

When we first started proofing out a cache with Redis, we used it in a pretty straightforward manner. Each individual geo entity was given its own key (i.e. `geo_entity:`), with the value being the GPB representing all the interesting things about the Entity (name, long/lat, etc).

For example, if we had a geo entity with an id of ‘1235594’, we’d get this as the key:

`geo_entities:1235594`. The value for that key would be the binary protobuf value.

However, this naive approach is wasteful with memory. With the knowledge that Redis stores hashes efficiently and some inspiration from this blog post, we tweaked our data model to get better memory performance.

We decided to take each geo entity Id, divide it by 1000, and use the quotient to build the key. We treated that key like a hash, and use the “remainder” (Id – (quotient * 1000)) as the field into that hash. By creating many hashes we give Redis a chance to do more efficient encoding of the fields and values within each hash, which dramatically reduces our memory usage. (You can read more about Redis memory optimization here.)

Again working with a geo entity with an id of ‘1235594’:

The original plan is simple; the key is: `geo_entities:1235594`, and we can do a simple SET:

SET ‘geo_entities:1235594’

Using hashes, things are a little bit more complicated:

key_id = 1235594 / 1000 = 1235 # integer division
field_id = 1235594 - (1235 * 1000) = 594

Given that we just do a HSET:

HSET ‘geo_entities:1235’ ‘594’

Another example with key id 1235777:

HSET ‘geo_entities:1235 ‘777’

However, the payoff is worth it. At time of posting, we hold about 3.5 million individual (normalized) user-generated profile location mappings to a candidate set of geo entities. Each member of that set has associated metadata that we use to improve and refine the system over time.

All told that consumes about 3G of memory in Redis. On average that is around 100 bytes per unique user profile location. That is a pretty great use of space!

Same Time, Same Place

Next time, we’ll discuss another abstraction we’ve built over Jedis to ease interacting with Redis from our client applications.

Bend, Don’t Break

This post explains how we solved a major problem we faced: the challenge of scaling our realtime data processing systems to handle ever-increasing volumes without drastically over-building. By using Kafka as a queuing system in the initial ingest, we’ve been able to build a system that elegantly scales to handle unpredictable load and that lets us “go wide” when we need to process higher than normal data volumes.

Drinking From The Firehose

At Gnip, we are continuously ingesting realtime firehoses of activities from multiple social media publishers (Twitter, Tumblr, WordPress, just to name a few). Due to breaking news stories or major events, the number of activities per second that we consume can spike up significantly. We have seen, for instance, upwards of sustained 20,000 Tweets per second coming in from Twitter during the Super Bowl or on election night – more than twice the average of 6-8,000 Tweets per second we see these days.

So far, we have always scaled each of our components to handle the highest imaginable load. This presents a challenge though, since the “highest imaginable load” increases continuously as use of social media increases. Three years ago, on average, we saw 1,000 Tweets per second. Now we are up to more than 6,000 Tweets per second every day, and peak volumes have increased in a similar fashion. Rather than have to continuously adjust and plan ahead for what we think the “highest imaginable” load might be, we’ve sought an approach that would let us handle whatever volumes might come in the front door.

Bend, Don’t Break

Therefore, we have spent a lot of time and effort thinking about what we do when we get a spike that is higher than the highest imaginable load – say 50,000 Tweets a second. Who knows, perhaps the 2chan community will flash mob twitter again, and we wouldn’t want to miss that. We want to make sure that we handle this event gracefully, which for us means concretely to never drop an activity or miss an enrichment. (When we add metadata to an activity – like a Klout score or an unwound URL – we call that an ‘enrichment’).

So, inspired by this blogpost, we decided that we will provision our inbound (or edge) infrastructure that consumes the firehoses to handle anything the publishers could ever throw at us, but have the internal components be more flexible. This allows us to guarantee an even higher level of reliability without spending exponentially more resources just to cover rare peak events.

Borrowing some football terminology we call this the “Bend, but don’t break” strategy.

Kafka The critical piece of open source software that allows us to do this easily is Kafka, since consumers can read from the Kafka topics at their own pace. Many other queuing systems are not as tolerant to slow consumers. They can implement back pressure and nothing is lost, as long as they don’t fall behind the window of data stored in Kafka queues. (The retention period is solely limited by how much you’d like to spend on drives.)

The consequence to downstream components is that the spikes are somewhat smoothed out. The following picture from a load test compares the volume we are getting from a publisher on the inbound edge of our infrastructure compared with the inbound volume of an internal component during a load test with a sustained volume of 25k Tweets/second.

As you can see, the internal component handled 20k tweets/second quite happily, but started applying back pressure at 25k tweets/sec and therefore took a couple of minutes to drain the backlog until it has completely caught up with real-time. This load test helped us identify the need to further scale that component to handle a possible 25k tweets/sec in realtime, so we increased capacity accordingly. But even through the spike was larger than expected, we did not experience any data loss or operational issues. Full data fidelity at all times is a major benefit of this approach and a requirement of our products.

Implementation

In order to make this work, we ensure that the apps never pull more work from Kafka than they can handle. For instance, in one step of our pipeline we extract information required for calculating enrichments from activities, e.g. Tweet text in order to run language classification, and then distribute that work to multiple worker hosts in order to parallelize the computation. The workers pull work from queues on a distributor app as fast as they can complete work.

Since we build our backend system in Java we leverage the functionality from java.util.concurrent quite extensively. As queues we use for this purpose LinkedBlockingQueue, so that if the workers can’t keep up, the threads on the distributor block until the queues get drained.  It is just as important to size the blocking queues properly when they are created:

LinkedBlockingQueue<EnrichmentRequest> requestQueue =
       new LinkedBlockingQueue<EnrichmentRequest>(MAX_QUEUE_SIZE);

Since reading from Kafka happens on the same thread, we naturally apply back pressure. MAX_QUEUE_SIZE depends mostly on how much memory you have available and how quickly you would want to apply back pressure.

One additional bit of work the distributor app has to do, is to park activities for a couple of seconds in order to give time to the workers to calculate the enrichments. Again java.util.concurrent comes in handy for building a BlockingDelayQueue:

public class BlockingDelayQueue {
    DelayQueue delayQueue = new DelayQueue();
    Semaphore capacitySemaphore;

    public BlockingDelayQueue(int capacity) {
        capacitySemaphore = new Semaphore(capacity);
    }

    public void put(T item) throws InterruptedException {
        capacitySemaphore.acquire();
        delayQueue.put(item);
    }

    public T take() throws InterruptedException {
        T item = delayQueue.take();
        capacitySemaphore.release();
        return item;
    }

    public int size() {
        return delayQueue.size();
    }
}

 

This class is built to support back pressure. In case the queue is not getting drained, the application backs off automatically.

Scalability

Apart from the persistent queuing feature from Kafka, the system also gains natural horizontal scalability out of the box, since internal components can be parallelized by consuming only a subset of partitions. In Kafka each topic is spread out across a configurable number of partitions in order to support horizontal scalability. Stay tuned to the blog for a deeper dive into what it takes to pump firehoses through Kafka and not miss a drop.

References

  1. http://kafka.apache.org

  2. http://mechanical-sympathy.blogspot.co.uk/2012/05/apply-back-pressure-when-overloaded.html

Building a Killer (Twitter) Search UI

Gnip is primarily an API company, providing its customers with API access to the social data activities they request and putting the onus of processing, analysis and display of the data on said customer.

In the case of our Search API product, many of our customers want to expose the functionality and control directly to their paying users, in a way that is similar to traditional search engines (Google, Twitter Search, etc). This type of integration is different from our HTTP Stream-based products, which require more behind-the-scenes management by our customers. So to demonstrate to our customers and prospects the type of applications they could build into their product suites using the Search API, we built Twitter Search on Rails.

I want to explain how I developed this bootstrap project for your use and why you would want to use it. The code may be Rails and CoffeeScript, but the concepts stand on their own. First, a demo:

Twitter Search on Rails Demo

Multiple Visualizations

There are many ways to slice and dice interesting tweets. Three ways our customers frequently do so include: trend analysis, a dashboard view of actual Tweet content, and geographic distribution.

Frequency over time

One request to Gnip’s Search Count API gives a nice history of frequency in the form of a line chart like this one.

Frequency of Black Friday tweets over time

You can see how I configured Highcharts to make the above chart in chart.coffee.

Geographic distribution

You can extract a great geographic distribution using the Profile Geo Enrichment mapped with MapBox:

Geo distribution of Black Friday tweets

Toss in a Marker Clusterer (for grouping geographically co-located points) and you’ll add another unique perspective to viewing the data.

Tweets that look like tweets

Visualizations are nice, but content is king™. We convert Activity Streams JSON into a tweet that conforms to Twitter’s Developer Display Guidelines using an yet-to-be-announced project. That means that entities like usernames and hashtags are linked; Retweet, Reply and Favorite work through Twitter Web Intents; and tweets support RTL display among other things.

When it comes to search, the query results have to stand out and behave the way a user would expect them to.

Decoupled Web Components

One can break the typical search app into fairly obvious pieces like the search form, the results, data retrieval etc. These pieces can operate independently of one another through the use of JavaScript events. This Pub/Sub model is perfect for decoupling parts of web apps.

Could not embed GitHub Gist 7989857: Bad credentials

Not only that, but if you also inject a DocumentFragment or some other container to be rendered to, you can unit/functional test this part of your webapp independently! Check out the less trivial implementation in activityList.coffee

Fast, Elegant Transitions

I think everyone would agree that they want a fast search experience. That doesn’t mean that it can’t be pretty, we just have to avoid animations that are known to be slow. The HTML5Rocks article on High Performance Animations explains what is performant and why much better than I could. TL;DR — Use CSS transitions, not JS; only transform opacity, rotate, translate and scale.

Hover over me

WABAM!

 

With that in mind, I found inspiration from Hakim El Hattab’s stroll.js when coming up with a simple but slick way to load search results.

Could not embed GitHub Gist 7990148: Bad credentials

About Search API on Rails

I originally wrote an application intended to show customers what they could build on top of Gnip’s Search API. It was such a hit that we had many requests from customers to license the code for building their own applications — so we decided to open-source it to facilitate integrations with the Search API.

We chose Rails here due to its popularity and, hell, we already know Rails. CoffeeScript’s classes made it easier to reason about web components that had an inheritance chain and because I like its syntax. Sass is our CSS preprocessor of choice here at Gnip.

This project isn’t just some proof-of-concept. It was written deliberately and with care and here are some features that prove it:

We hope you’ll find this background helpful when building your application on top of the Search API with Twitter Search on Rails! If you find an improvement, we welcome your contribution through GitHub. Enjoy!

Exploring S3 Read Performance

This blog post is a collaboration between Jud Valeski and Nick Matheson.

When you use Peta as the unit to measure your S3 storage usage, you’re dealing with a lot of data. Gnip has been a tremendously heavy S3 user since S3 came online; it remains our large-scale durable storage solution.

Over the years, we’ve worked with a wide array of data access models to accomplish a variety of things. From backfilling a customer’s data outage from our archives, to creating low-latency search products, how and where we access data is an integral part of our thinking.

If you’ve ever worked with really large data sets you know that the main challenge around them is moving them from application A to application B. Sneakernet is often the best way as it turns out.

Some of our datasets on S3 are large enough that moving them around is so impractical that we’ve had to explore various data access patterns and approaches that use S3 as big blob backend storage.

One such approach is using HTTP byte range requests into our S3 buckets in order to access just the blocks of data we’re interested in. This allows one to separate the data request from the actual “file” (which may be inconveniently large to transfer across the wire in low-latency access scenarios). It does require an index into said S3 buckets however; that needs to be calculated ahead of time.

Gnip recently experimented with this concept. Here are the findings. We’ve run into a few folks over the years who considered using S3 in this manner and we wanted to share our knowledge in hopes of furthering exploration of S3 as a blob storage device.

The Setup

Our tests explored a few combinations of node (ec2 instance) count, thread count and byte range size. The node count was varied from 1 to 7, the thread count from 1 to 400 and the byte range size between 1KB and 10KB. The range lookups were performed randomly over a single bucket with roughly half a million keys and 100TB of data.

The Results

The investigation started with a focus on scalability of a single node. The throughput of a single thread is not very compelling at ~8 lookups per second, but these lookups scale near linearly with increased thread count, even out to 400 threads on our c1.xlarge instance. At 400 threads making 10KB lookups network saturation becomes an issue as sustained 40+ MB/s is high for a c1.xlarge in our experience.

The next phase of the investigation focused on scaling out by adding more nodes. For these tests there was a significant difference between the scalability of 1KB vs 10KB lookups. With 10KB lookups even scaling beyond 2 nodes yielded rapidly diminishing returns. In the context of this test, we were unable to find a similar limit for 1KB lookups although there was a slight slope change for the last data point (7 nodes). It should be noted that these are limits for our specific bucket/key/partition structure and access pattern and thus may not be representative of true S3 limits or other usage scenarios.

In addition to the scalability tests, we also tracked the request latencies with a focus on the 90th, 95th and 99th percentile times. Under light and moderate load, the 99th percentile times were both in the 450-500ms range. Under moderate and heavy load these declined significantly to 1s and 6s respectively. The frequency of read/connect timeouts under moderate/heavy load significantly increased as well. There was significant variance in the percentile results both over time of day/week as well as minute to minute.

The Bottlenecks

Overall, this approach scaled better than we had initially anticipated but it did take some work on the client side to be as lean as possible. Things like keep-alive/connection pool tuning and releasing network resources as soon as possible (gotta read from/close those input streams) were all necessary to get to this level of throughput.

These tests started to bump into both per node as well as aggregate bandwidth limits. On a per node basis this is unlikely to be of practical concern since at this level of throughput cpu was at 50% utilization and any ‘real application’ logic would likely become the actual bottleneck effectively easing off the network. We will continue to investigate the aggregate performance levels as we suspect this has more to do with our particular data distribution, usage and S3 partitioning for which we have done limited tuning/investigation.

These tests should also be taken with a grain of salt as they likely represent a sunny day scenario that will likely not hold under the variety of conditions that we see running production systems at Amazon.