Enriching With Redis Part II: Schema Happens

As a follow up to our first post about Redis at Gnip, we’ll discuss how we built on our basic client side Redis usage.

Client Side Woes

Early on our Redis usage implementation was great. It was performant, we had failover, replication, and health checks. However a lot of our client code for using Redis had become pretty low level. For instance code to read or write an enrichment had to worry about, and implement things like: Which key is this going to live in? Hashed? Set? How do I serialize/deserialize this value? What happens if it fails, or I need to retry? Does this operation need to be pipelined? What’s the domain logic around the data?

Operational Woes

Another issue was handling iterations and evolutions to how we store the data, where we store, and how we relate it to other things. Simply clearing the cache wasn’t an option since that would be unreasonable downtime in our real-time pipeline of enrichments. Instead we versioned access to our Redis instance by explicitly branching around a config option. This allowed us to, say, write into both the “old” and “new” cache, and then switch reading over to “new” once it was fully populated. However all the code to deal with reading, writing and versioning quickly overwhelmed the domain intent of what we were doing, and made it difficult to reason about correct behavior.

Realization

What we noticed was that we implicitly had a schema defined in between all the serialization, ordering, and what-goes-where logic. Redis itself is schema-less (just commands against keys), but more often than not your application specific usage of Redis does have a schema: a relationship between keys, the values they hold, and how your application interprets them.

Schema

We decided to formalize our schemas in source. You could define a set of “keys”, and then perform operations against those keys. For instance this is how we logically store data into our profile location cache: “I want to be able to quickly look up a given Geo Entity by it’s Id. I’d also like to compact them into a set of hashes, for reasonable memory performance by dividing the Id by 1000 to get the top level hash, and using the remainder as the field inside that hash. The Geo Entities I’ll be storing are plain Google Protocol Buffers. Furthermore I want to track both the total count of geo entities, and each component hash key of the overall bucket of Geo Entities”. Woo! what a mouthful! Imagine all the mechanical drudgery needed to remember all those details. In our schema definition this is what it looks like:

geoEntityBucketDefinition = new ComputedBucketDefinition<Integer, Enrichments.GeoEntity>()
                .prefix("gnip:GtGe")
                .keyStrategy(new DivideIntegerBy(1000))
                .transformer(new GpbTransformer<Integer, Enrichments.GeoEntity>(
                        false,
                        new GpbTransformerBuilder<Enrichments.GeoEntity>() {
                            @Override
                            public Enrichments.GeoEntity from(byte[] bytes) 
                              throws InvalidProtocolBufferException {
                                return Enrichments
                                  .GeoEntity
                                  .newBuilder()
                                  .mergeFrom(bytes).build();
                            }
                        }))
                .trackComponentKeys("gnip:MetaBuckets")
                .trackCounts("gnip:NumOfGeoEntities");

Pipeline vs Direct Access

One of the biggest performance wins when using Redis is utilizing the pipelining feature. Baking that into our Schema implementation was a requirement from day one. Before we can see how the schema helps with this, a detour into Jedis usage is required.

Pipelining in Jedis

Pipelining in Redis is simply sending multiple commands in one request, and then reading the responses back. But what does this look like in a client library? Let’s say we’re working with a key called foo. With Jedis we can simply say:

Jedis j = new Jedis();
String value = j.get(“foo”);

Ok, that’s pretty great. But what if foo were one of N keys that we wanted to pipeline? Well we’d do something like the following:

Jedis j = new Jedis();
Pipeline p = j.pipelined();

Response<String> valueResponse = p.get(“foo”);

p.sync();

String value = valueResponse.get();

Here you can see that Pipeline.get(String) returns a String wrapped in a Response type. Response is simply a Promise or Future; it’s a handle to the result of the command that hasn’t run yet. When we invoke p.sync() we’re telling redis “hey, we’re not sending any more commands, please process what I sent you”. Jedis will then parse that response, and fill in the values underneath the Response objects, so that Response<>.get() actually gives you data.

Pipelining with Schema

While the concept behind pipelines in Jedis is pretty straightforward, managing all the state of dealing with it can easily overcrowd the intent of a piece of code with a bunch of mechanics. Instead the Schema has pipelining baked in, allowing you to access the same keys in the same way, without having to worry about tracking individual Response objects, and when it’s ok to call get() on them. How? Every KeyDefinition type has the ability to build a pipelined version, or a direct version of itself. This also enables us to perform any actions against the schema still in the context of our retry aware, and failover enabled, connection pool. Let’s say this is our schema definition:

public class MySchema
  extends RedisSchema<MySchema.PipelineConnection, MySchema.DirectConnection> {
    private final ComputedBucketKeyDefinition geoEntityBucketDefinition;

    public MySchema() {
        geoEntityBucketDefinition = // Definition from above
    }

    @Override
    protected PipelineConnection buildPipeline(Pipeline p) {
        return new PipelineConnection(
            geoEntityBucketDefinition.buildPipeline(p)
        );
    }

    @Override
    protected DirectConnection buildDirect(Jedis j) {
        return new DirectConnection(
            geoEntityBucketDefinition.buildDirect(j)
        );
    }

    public static class PipelineConnection {
        private final PipelineBucketKey geoEntityBucketKey;

        PipelineConnection(PipelineBucketKey geoEntityBucketKey) {
            geoEntityBucketKey = geoEntityBucketKey;
        }

        public PipelineBucketKey geoEntityBucket() {
            return geoEntityBucketKey;
        }
    }

    // Same pattern for DirectConnection
}

Here we provide a way to build a pipeline or direct connection from our schema, by just asking our definition to build the right version of it’s key, and then exposing that for use.

Usage

Enrichments.GeoEntity geoEntity = schema.pipelineResult(
  new SchemaConnectionResultAction
  <ProfileLocationRedisSchemaV2.PipelineConnection, Deferred>() {
    @Override
    public Deferred execute(
    ProfileLocationRedisSchemaV2.PipelineConnection connection) {
      return connection.geoEntityBucket().get(1);
   }
 }
);
Map<Integer, Enrichments.GeoEntity> geoEntitiesById = 
  schema.pipelineMapResult(
  new SchemaConnectionResultAction
    <ProfileLocationRedisSchemaV2.PipelineConnection,
      Map<Integer,
        Deferred
        <Enrichments.GeoEntity>>>() {
        @Override
        public Map<Integer, Deferred<Enrichments.GeoEntity>>
          execute(
            ProfileLocationRedisSchemaV2.PipelineConnection connection) {
            return connection.geoEntityBucket().get(geoIds);
        }
    });

Schema Versioning

A topic we touched on last time was the idea of “versioning” our schema. The idea here is to allow us to evolve the keys we use, while minimizing downtime, and confusion. Our solution is more to apply a pattern to how we use our Schema defintions. First we append a version number to our schema definition:

public class MySchemaV0 { // etc }

Then we define an interface for what we want to do against the abstract notion of our “cache”:

public interface GeoEntityService {
    void put(GeoEntity geoEntity);
    GeoEntity get(Integer id);
}

And finally we glue our specific schema version to the interface:

public class GeoEntityServiceV0 implements GeoEntityService {
    public GeoEntityService(MySchemaV0 schema) { }
    // impls here
}

We can then control what “version” we bind to the abstract interface with a configuration option. For instance, we may stand up a second writer with V<N+1> while all the readers use V. Once <N+1> is populated we can flip all the readers over to it just by changing their config.

Enriching With Redis: Part 1

We’ve talked previously about how we use Redis to Track Cluster Health for our Historical PowerTrack product.

Now we’ll be exploring how we enrich with Redis.

Adding the Bling

For every activity that flows through Gnip, we add extra metadata that provides additional information that our customers are interested in. We call this extra metadata Enrichments, and it’s what allows customers to filter on URLs or Profile Location.

As you can imagine enriching every activity in realtime requires a lot of work to make sure that we don’t add significant delay to the realtime stream. When you think of an enrichment such as Profile Location, it’s hard to imagine doing all the work to match the user bio location for every activity as we see them.

Instead, we depend on “pre-computing” as much as we can, caching that pre-chewed data, and then in realtime quickly fetching and enriching a particular activity.

We still need something low-latency, and fast to serve as our cache, and Redis fits that bill pretty nicely.

Profile Geo

For this post, we’ll use our Profile Geo enrichment as an example for all of our work. If you’re not familiar with this enrichment you can read more about it here, but for a quick overview, this enrichment takes the opaque, user-defined profile location string from each activity, and attempts to find a matching, structured geo entity. This adds metadata like name, timezone, long/lat center point, and hierarchical info (i.e. Boulder, is part of Colorado). Our PowerTrack customers are then able to filter on this new data.

Infrastructure

Non-realtime processes are responsible for crunching, and storing the relevant enrichment data into the Redis cache. This data is stored in Google protocol buffers for quick fetching, parsing and reducing memory pressure in Redis.

locaterator_diagram

For Profile Geo, we have an app that sniffs for geotagged activities. When it receives one, it then attempts to geo locate the location string in that user’s profile. If the distance between the geo entity that we find based on the profile location is close to the longitude, latitude in the activity then we store the mapping between that profile location string and the given geo entity. If that mapping already existed, it increases a “score” for that mapping. Later when our worker process goes to perform the actual enrichment, it pulls the full set of mappings for that profile location string, and chooses the best one based on score and other signals.

Redis really shines in this type of low-latency, batched, workload. The main Redis instance in the Profile Location cache performs about 20k operations per second, over a working set of 2.8G.

Redis meet Gnip, Gnip meet Redis

Each Redis host also has a Gnip specific app, that lets us tie in the Redis host to our own health checks, and configuration rollout. The app does a few extra things like periodically uploading RDBs to S3 for insurance against losing an entire machine.

Jedis

We make use of the excellent Jedis library to access all the data that we store in our various Redis caches.

Failover

We built in an idea of “failover”. All of our Redis instances participate in master-slave replication. Our “failover” is a simple “given a list of priority sorted Redis hosts, try to use the highest priority host always”.

For example, if the master goes down then every client will fail over to querying the slave (but the app that is responsible for populating the cache is never configured to see the slave, so will just stop writing), and then when the master comes back online, everyone can start querying it again.

This lets us address operational things such as network hiccups, or node failures, but it also lets us do things like upgrade Redis live without having to stop delivering enrichments.

This is a trade off between availability and correctness. We’re ok with being a little bit stale if it means we can still serve a reasonable answer instead of full-on unavailability of the entire enrichment for everyone.

Optimizations

Pipelining

We are also able to get such good performance by using a feature called pipelining. Redis is a request / response based protocol. Send a command, wait, read the response. While this is simple, a network roundtrip per command is not going to give you good throughput when you need to keep with fast-moving streams (like the Twitter firehose). But Redis also ships with a time-honored feature, whereby you send multiple commands at once, and then wait for the responses for all of those commands. This feature allows us to bulk write to and read from our caches.

Hashing

When we first started proofing out a cache with Redis, we used it in a pretty straightforward manner. Each individual geo entity was given its own key (i.e. `geo_entity:`), with the value being the GPB representing all the interesting things about the Entity (name, long/lat, etc).

For example, if we had a geo entity with an id of ‘1235594’, we’d get this as the key:

`geo_entities:1235594`. The value for that key would be the binary protobuf value.

However, this naive approach is wasteful with memory. With the knowledge that Redis stores hashes efficiently and some inspiration from this blog post, we tweaked our data model to get better memory performance.

We decided to take each geo entity Id, divide it by 1000, and use the quotient to build the key. We treated that key like a hash, and use the “remainder” (Id – (quotient * 1000)) as the field into that hash. By creating many hashes we give Redis a chance to do more efficient encoding of the fields and values within each hash, which dramatically reduces our memory usage. (You can read more about Redis memory optimization here.)

Again working with a geo entity with an id of ‘1235594’:

The original plan is simple; the key is: `geo_entities:1235594`, and we can do a simple SET:

SET ‘geo_entities:1235594’

Using hashes, things are a little bit more complicated:

key_id = 1235594 / 1000 = 1235 # integer division
field_id = 1235594 - (1235 * 1000) = 594

Given that we just do a HSET:

HSET ‘geo_entities:1235’ ‘594’

Another example with key id 1235777:

HSET ‘geo_entities:1235 ‘777’

However, the payoff is worth it. At time of posting, we hold about 3.5 million individual (normalized) user-generated profile location mappings to a candidate set of geo entities. Each member of that set has associated metadata that we use to improve and refine the system over time.

All told that consumes about 3G of memory in Redis. On average that is around 100 bytes per unique user profile location. That is a pretty great use of space!

Same Time, Same Place

Next time, we’ll discuss another abstraction we’ve built over Jedis to ease interacting with Redis from our client applications.

Tracking Cluster Health With Redis

Historical PowerTrack Background

Our new Historical PowerTrack for Twitter allows you to run filtered searches across the entire Twitter archive. At its heart, it is a cluster of machines where each machine runs a handful of Resque workers that do all of the heavy lifting of processing and filtering the data.

Like any good production application, we have an abundant amount of health reporting in place to monitor problems and quickly respond to issues.

Resque itself runs on top of Redis, an open source data structure server. Since we already have our Redis infrastructure set up to power Resque, we can leverage Redis to track our workers health information.

One of the things we track per worker is an “error rate”.  The error rate is the number of errors seen in the last 30 minutes. Once that climbs above a threshold, we then send an alert so that we can examine why that worker is having issues.

Implementation 0

Using Resque Job Hooks a worker is notified when it fails to process a job.

module HealthChecks
  def on_failure_record_health_status
    # Redis code will go here
  end
end

Inside that callback, we then simply create a new key to represent the failure:

setex gnip:health:worker:<worker_id>:error_instance:<unix_timestamp>, 30*60, ‘’

We also give the key an expiry of 30 minutes when we create it.

Then in our health checking code we can simply get the count of error_instances for a given worker_id:

keys gnip:health:worker:<worker_id>:error_instance:*

Since an error instance key dies after 30 minutes, the size of the resulting set from `KEYS` will tell us the error rate! Huzzah!

While this solution works, and was quick to get up and running, there was a catch; namely this tidbit from the Redis documentation on the KEYS command:

“Warning: consider KEYS as a command that should only be used in production environments with extreme care. It may ruin performance when it is executed against large databases. This command is intended for debugging and special operations, such as changing your keyspace layout. Don’t use KEYS in your regular application code. If you’re looking for a way to find keys in a subset of your keyspace, consider using sets.”

Our first implementation is workable, but not very performant or safe. Luckily the docs give us a clue in the warning message: sets.

Current Implementation

Redis describes itself as a data structure server. This means that the value for a given key can be a basic string or number, but can also be a Hash, List, Set, or Sorted Set.

What we’re interested in are Sorted Sets. They work just like normal Sets: you add members, and they must be unique (just like mathematical sets), you can union, intersect, and difference them, however each member also has an associated non-unique score. Redis will automatically keep the set sorted for you by the score.

Sorted Sets have an associated command that is crucial for the the current implementation:

ZCOUNT key min max – Returns the number of elements in the sorted set at key with a score between min and max.

ZCOUNT will return the count of elements in the range of a *score*, not the value of the members. This is pretty efficient too, since Redis is already holding that set sorted by score.

The current implementation now works like this:

Every time an error occurs, we record the Unix timestamp (seconds since Jan 1st 1970).

We then add a new member to the error rate set for a worker, where both the member and the score is the Unix timestamp. We also reset the expiration for the key to 30 minutes (less things to manually clean up later).

zadd gnip:health:worker:<worker_id>:error_instances unix_ts, unix_ts
expire gnip:health:worker:<worker_id>:error_instances 30*60

To calculate the error rate for a worker we can then do:

zcount gnip:health:worker:<worker_id>:error_instances, unix_timestamp_30_seconds_ago, unix_timestamp_now

Because our score is a timestamp, old errors “fall out” of our view when calculating the error rate. Then after 30 minutes of no errors, the key disappears.

Redis commands are also very forgiving of state, so a ZCOUNT against a non-existent key is 0, which is usually going to be the happy path.

What’s really great is that instead of one worker generating multiple keys to represent errors, we now only create one key for each work to represent the error rate.

Other Considerations

There are a few other ways we could have implemented this feature. We could have used milliseconds, but in our app it’s highly unlikely that multiple errors would be thrown in a second.

We could also store the exception message (or even full stack trace) as the member in the set instead of the timestamp again (perhaps prefixed with timestamp to make it unique), and then used ZRANGEBYSCORE just like we used ZCOUNT to get the list of errors in the last 30 minutes.

While this (and other health checks) were written while developing our product, they are not specific to it. Would you, dear Reader, find it helpful if Resque tracked things like error rate, sequential failures, etc per worker (either in resque core, or a add-on gem)?

To wrap up, Redis has a bunch of great commands and data types to solve real problems quickly and easily! Consider this pattern the next time you’re looking to keep track of the rate of events in a given time window.