Enriching With Redis Part II: Schema Happens

As a follow up to our first post about Redis at Gnip, we’ll discuss how we built on our basic client side Redis usage.

Client Side Woes

Early on our Redis usage implementation was great. It was performant, we had failover, replication, and health checks. However a lot of our client code for using Redis had become pretty low level. For instance code to read or write an enrichment had to worry about, and implement things like: Which key is this going to live in? Hashed? Set? How do I serialize/deserialize this value? What happens if it fails, or I need to retry? Does this operation need to be pipelined? What’s the domain logic around the data?

Operational Woes

Another issue was handling iterations and evolutions to how we store the data, where we store, and how we relate it to other things. Simply clearing the cache wasn’t an option since that would be unreasonable downtime in our real-time pipeline of enrichments. Instead we versioned access to our Redis instance by explicitly branching around a config option. This allowed us to, say, write into both the “old” and “new” cache, and then switch reading over to “new” once it was fully populated. However all the code to deal with reading, writing and versioning quickly overwhelmed the domain intent of what we were doing, and made it difficult to reason about correct behavior.

Realization

What we noticed was that we implicitly had a schema defined in between all the serialization, ordering, and what-goes-where logic. Redis itself is schema-less (just commands against keys), but more often than not your application specific usage of Redis does have a schema: a relationship between keys, the values they hold, and how your application interprets them.

Schema

We decided to formalize our schemas in source. You could define a set of “keys”, and then perform operations against those keys. For instance this is how we logically store data into our profile location cache: “I want to be able to quickly look up a given Geo Entity by it’s Id. I’d also like to compact them into a set of hashes, for reasonable memory performance by dividing the Id by 1000 to get the top level hash, and using the remainder as the field inside that hash. The Geo Entities I’ll be storing are plain Google Protocol Buffers. Furthermore I want to track both the total count of geo entities, and each component hash key of the overall bucket of Geo Entities”. Woo! what a mouthful! Imagine all the mechanical drudgery needed to remember all those details. In our schema definition this is what it looks like:

geoEntityBucketDefinition = new ComputedBucketDefinition<Integer, Enrichments.GeoEntity>()
                .prefix("gnip:GtGe")
                .keyStrategy(new DivideIntegerBy(1000))
                .transformer(new GpbTransformer<Integer, Enrichments.GeoEntity>(
                        false,
                        new GpbTransformerBuilder<Enrichments.GeoEntity>() {
                            @Override
                            public Enrichments.GeoEntity from(byte[] bytes) 
                              throws InvalidProtocolBufferException {
                                return Enrichments
                                  .GeoEntity
                                  .newBuilder()
                                  .mergeFrom(bytes).build();
                            }
                        }))
                .trackComponentKeys("gnip:MetaBuckets")
                .trackCounts("gnip:NumOfGeoEntities");

Pipeline vs Direct Access

One of the biggest performance wins when using Redis is utilizing the pipelining feature. Baking that into our Schema implementation was a requirement from day one. Before we can see how the schema helps with this, a detour into Jedis usage is required.

Pipelining in Jedis

Pipelining in Redis is simply sending multiple commands in one request, and then reading the responses back. But what does this look like in a client library? Let’s say we’re working with a key called foo. With Jedis we can simply say:

Jedis j = new Jedis();
String value = j.get(“foo”);

Ok, that’s pretty great. But what if foo were one of N keys that we wanted to pipeline? Well we’d do something like the following:

Jedis j = new Jedis();
Pipeline p = j.pipelined();

Response<String> valueResponse = p.get(“foo”);

p.sync();

String value = valueResponse.get();

Here you can see that Pipeline.get(String) returns a String wrapped in a Response type. Response is simply a Promise or Future; it’s a handle to the result of the command that hasn’t run yet. When we invoke p.sync() we’re telling redis “hey, we’re not sending any more commands, please process what I sent you”. Jedis will then parse that response, and fill in the values underneath the Response objects, so that Response<>.get() actually gives you data.

Pipelining with Schema

While the concept behind pipelines in Jedis is pretty straightforward, managing all the state of dealing with it can easily overcrowd the intent of a piece of code with a bunch of mechanics. Instead the Schema has pipelining baked in, allowing you to access the same keys in the same way, without having to worry about tracking individual Response objects, and when it’s ok to call get() on them. How? Every KeyDefinition type has the ability to build a pipelined version, or a direct version of itself. This also enables us to perform any actions against the schema still in the context of our retry aware, and failover enabled, connection pool. Let’s say this is our schema definition:

public class MySchema
  extends RedisSchema<MySchema.PipelineConnection, MySchema.DirectConnection> {
    private final ComputedBucketKeyDefinition geoEntityBucketDefinition;

    public MySchema() {
        geoEntityBucketDefinition = // Definition from above
    }

    @Override
    protected PipelineConnection buildPipeline(Pipeline p) {
        return new PipelineConnection(
            geoEntityBucketDefinition.buildPipeline(p)
        );
    }

    @Override
    protected DirectConnection buildDirect(Jedis j) {
        return new DirectConnection(
            geoEntityBucketDefinition.buildDirect(j)
        );
    }

    public static class PipelineConnection {
        private final PipelineBucketKey geoEntityBucketKey;

        PipelineConnection(PipelineBucketKey geoEntityBucketKey) {
            geoEntityBucketKey = geoEntityBucketKey;
        }

        public PipelineBucketKey geoEntityBucket() {
            return geoEntityBucketKey;
        }
    }

    // Same pattern for DirectConnection
}

Here we provide a way to build a pipeline or direct connection from our schema, by just asking our definition to build the right version of it’s key, and then exposing that for use.

Usage

Enrichments.GeoEntity geoEntity = schema.pipelineResult(
  new SchemaConnectionResultAction
  <ProfileLocationRedisSchemaV2.PipelineConnection, Deferred>() {
    @Override
    public Deferred execute(
    ProfileLocationRedisSchemaV2.PipelineConnection connection) {
      return connection.geoEntityBucket().get(1);
   }
 }
);
Map<Integer, Enrichments.GeoEntity> geoEntitiesById = 
  schema.pipelineMapResult(
  new SchemaConnectionResultAction
    <ProfileLocationRedisSchemaV2.PipelineConnection,
      Map<Integer,
        Deferred
        <Enrichments.GeoEntity>>>() {
        @Override
        public Map<Integer, Deferred<Enrichments.GeoEntity>>
          execute(
            ProfileLocationRedisSchemaV2.PipelineConnection connection) {
            return connection.geoEntityBucket().get(geoIds);
        }
    });

Schema Versioning

A topic we touched on last time was the idea of “versioning” our schema. The idea here is to allow us to evolve the keys we use, while minimizing downtime, and confusion. Our solution is more to apply a pattern to how we use our Schema defintions. First we append a version number to our schema definition:

public class MySchemaV0 { // etc }

Then we define an interface for what we want to do against the abstract notion of our “cache”:

public interface GeoEntityService {
    void put(GeoEntity geoEntity);
    GeoEntity get(Integer id);
}

And finally we glue our specific schema version to the interface:

public class GeoEntityServiceV0 implements GeoEntityService {
    public GeoEntityService(MySchemaV0 schema) { }
    // impls here
}

We can then control what “version” we bind to the abstract interface with a configuration option. For instance, we may stand up a second writer with V<N+1> while all the readers use V. Once <N+1> is populated we can flip all the readers over to it just by changing their config.