Tracking Cluster Health With Redis

Historical PowerTrack Background

Our new Historical PowerTrack for Twitter allows you to run filtered searches across the entire Twitter archive. At its heart, it is a cluster of machines where each machine runs a handful of Resque workers that do all of the heavy lifting of processing and filtering the data.

Like any good production application, we have an abundant amount of health reporting in place to monitor problems and quickly respond to issues.

Resque itself runs on top of Redis, an open source data structure server. Since we already have our Redis infrastructure set up to power Resque, we can leverage Redis to track our workers health information.

One of the things we track per worker is an “error rate”.  The error rate is the number of errors seen in the last 30 minutes. Once that climbs above a threshold, we then send an alert so that we can examine why that worker is having issues.

Implementation 0

Using Resque Job Hooks a worker is notified when it fails to process a job.

module HealthChecks
  def on_failure_record_health_status
    # Redis code will go here
  end
end

Inside that callback, we then simply create a new key to represent the failure:

setex gnip:health:worker:<worker_id>:error_instance:<unix_timestamp>, 30*60, ‘’

We also give the key an expiry of 30 minutes when we create it.

Then in our health checking code we can simply get the count of error_instances for a given worker_id:

keys gnip:health:worker:<worker_id>:error_instance:*

Since an error instance key dies after 30 minutes, the size of the resulting set from `KEYS` will tell us the error rate! Huzzah!

While this solution works, and was quick to get up and running, there was a catch; namely this tidbit from the Redis documentation on the KEYS command:

“Warning: consider KEYS as a command that should only be used in production environments with extreme care. It may ruin performance when it is executed against large databases. This command is intended for debugging and special operations, such as changing your keyspace layout. Don’t use KEYS in your regular application code. If you’re looking for a way to find keys in a subset of your keyspace, consider using sets.”

Our first implementation is workable, but not very performant or safe. Luckily the docs give us a clue in the warning message: sets.

Current Implementation

Redis describes itself as a data structure server. This means that the value for a given key can be a basic string or number, but can also be a Hash, List, Set, or Sorted Set.

What we’re interested in are Sorted Sets. They work just like normal Sets: you add members, and they must be unique (just like mathematical sets), you can union, intersect, and difference them, however each member also has an associated non-unique score. Redis will automatically keep the set sorted for you by the score.

Sorted Sets have an associated command that is crucial for the the current implementation:

ZCOUNT key min max – Returns the number of elements in the sorted set at key with a score between min and max.

ZCOUNT will return the count of elements in the range of a *score*, not the value of the members. This is pretty efficient too, since Redis is already holding that set sorted by score.

The current implementation now works like this:

Every time an error occurs, we record the Unix timestamp (seconds since Jan 1st 1970).

We then add a new member to the error rate set for a worker, where both the member and the score is the Unix timestamp. We also reset the expiration for the key to 30 minutes (less things to manually clean up later).

zadd gnip:health:worker:<worker_id>:error_instances unix_ts, unix_ts
expire gnip:health:worker:<worker_id>:error_instances 30*60

To calculate the error rate for a worker we can then do:

zcount gnip:health:worker:<worker_id>:error_instances, unix_timestamp_30_seconds_ago, unix_timestamp_now

Because our score is a timestamp, old errors “fall out” of our view when calculating the error rate. Then after 30 minutes of no errors, the key disappears.

Redis commands are also very forgiving of state, so a ZCOUNT against a non-existent key is 0, which is usually going to be the happy path.

What’s really great is that instead of one worker generating multiple keys to represent errors, we now only create one key for each work to represent the error rate.

Other Considerations

There are a few other ways we could have implemented this feature. We could have used milliseconds, but in our app it’s highly unlikely that multiple errors would be thrown in a second.

We could also store the exception message (or even full stack trace) as the member in the set instead of the timestamp again (perhaps prefixed with timestamp to make it unique), and then used ZRANGEBYSCORE just like we used ZCOUNT to get the list of errors in the last 30 minutes.

While this (and other health checks) were written while developing our product, they are not specific to it. Would you, dear Reader, find it helpful if Resque tracked things like error rate, sequential failures, etc per worker (either in resque core, or a add-on gem)?

To wrap up, Redis has a bunch of great commands and data types to solve real problems quickly and easily! Consider this pattern the next time you’re looking to keep track of the rate of events in a given time window.

Finding Bugs at High Speed: Kafka Thread Safety

Whenever you are adding a new component to a system there are going to be integration pains. What do all these log lines mean? Is this an error I have to worry about, or is it part of normal operation? Sometimes these questions take a while to answer, particularly if the tools you are using are chatty.

Recently, we added a Kafka, a distributed messaging system, to our architecture. Kafka lets us focus on what we’re good at, streaming large numbers of activities, very quickly, while opening up our architecture to a number of scalability improvements in the future.

When we first started experimenting with Kafka, we had a number of problems with it. Its logging was fairly chatty. Sometimes it would even log stack traces under normal operation. This made it hard to figure out whether a particular log message was an error or normal. Also, we had some configuration options set too small–some timeouts were set too short, which meant that the client’s connections to the Kafka server were getting interrupted regularly.

One stack trace stood out though a null pointer exception from inside the Scala JSON parser (stacktrace).

I searched for the first line in the stack trace and found this issue in Scala’s issue tracker (issues.scala-lang.org/browse/SI-4929). It turned out that Scala’s standard library JSON parser wasn’t thread safe. It has been patched, but not for the version of Scala we’re using with Kafka.

Looking further up the call chain, I saw that JSON parser was being called from within Kafka’s client code. Specifically, it was being called when the consumers were being rebalanced. Kafka rebalances consumers for a topic whenever either a consumer is added to the topic or it’s session with ZooKeeper times out. We had our timeouts set too low, so that was happening often. In addition, we were creating multiple consumers within the same JVM. The end result was that we were parsing JSON using multiple threads at around the same time with a thread unsafe parser and doing it fairly frequently. This was a recipe for a concurrency problem.

We already had forked Kafka to make some configuration changes, so I worked up a patch for our fork. I changed Kafka’s consumer code so it would create a new JSON parser each time it asked for topic information from ZooKeeper. With that change, the separate threads were no longer using a shared parser.

Creating new parsers every time Kafka needed data from ZooKeeper added a small amount of overhead in exchange for thread safety. Since rebalancing should happen infrequently, adding that overhead didn’t have much effect on performance. That was especially true after we reconfigured the timeouts with ZooKeeper so we stopped unbalancing unnecessarily.

To wrap things up, I filed a ticket on Kafka’s issue tracker with my patch (issues.apache.org/jira/browse/KAFKA-379). A fix was put in and now new releases of Kafka will not have thread safety issues with JSON parsing.

Application Deployment at Gnip

Managing your application code on more than 500 servers is a non-trivial task. One of the tenets we’ve held onto closely as an engineering team at Gnip is “owning your code all the way to the metal”. In order to promote this sense of ownership, we try to keep a clean and simple deployment process.

To illustrate our application deployment process, let’s assume that we’re checking in a new feature to our favorite Gnip application, the flingerator. We will also assume that we have a fully provisioned server that is already running an older version of our code (I’ll save provisioning / bootstrapping servers for another blog post). The process is as follows:

1. Commit: git commit -am “er/ch: checking in my super awesome new feature”
2. Build: One of our numerous cruisecontrol.rb servers picks up the changeset from git and uses maven to build an RPM.
3. Promote: After the build completes, run cap flingerator:promote -S environment=review.
4. Deploy: cap flingerator:roll -S environment=review

Let me break down what is happening at each step of the process:

Commit
Every developer commits or merges their feature branch into master. Every piece of code that lands on master is code reviewed by the developer who wrote the code and at least one other developer. The commit message includes the initials of the developer who wrote the feature as well as the person who code reviewed it. After the commit is made, the master branch is pushed up to github.

Build
After the commit lands on master, our build server (cruisecontrol.rb) uses maven to run automated tests, build jars, and create RPM(s). After the RPM is created, cruisecontrol.rb then copies said RPM to our yum repo server into the “build” directory. Although the build is copied to our yum repo server, it is not ready for deployment just yet.

Promote
After cruise.rb has successfully transferred the RPM to the yum server’s “build” directory, the developer can promote the new code into a particular environment by running the following capistrano command: cap flingerator:promote -S environment=review. This command uses capistrano to ssh to the yum repo server and creates a symlink from the “build” directory to the review (or staging or prod) environment directory. This action makes said RPM available to install on any server in a particular environment via yum.

Deploy
Now that the RPM has been promoted, it is now available via the gnip yum repo. It is now up to the dev to run another capistrano command to deploy the code: cap flingerator:roll -S environment=review. This command ssh’es to each flingerator server and runs “yum update flingerator”. This installs the new code onto the filesystem. After successful completion of the “yum install” the application process is restarted and the new code is running.

This process uses proven technologies to create a stable and repeatable deployment process, which is extremely important in order to provide an enterprise grade customer experience.

Welcome to Gnip’s Engineering Blog

One thing you learn in a startup is to never get too tied to where you are today. You’re trying to find a new market or serve an existing one in a new way. Not only is the business bound to change over time, but the techonology as well. As an introduction to our Engineering blog, here’s a brief history of the evolution of Gnip’s technology.

When Gnip first started it set out to “solve the polling problem”. The world of public web APIs was younger than it is now, social media itself younger still, and the business of social media data barely in its infancy. The vast, vast majority of infrastructure was built around HTTP polling APIs. The amount of data available wasn’t nearly as large as it is today (to make a monumental understatement), so the polling APIs were generally adequate for most uses.

While relatively straight-forward to implement and consume, polling APIs have obvious inefficiencies on both ends. Gnip’s first system (well, technically second) was largely built to turn polling APIs into push APIs. It gathered data through various means like polling, sent the data through a centralized pipeline, finally filtering and fanning out the data to all consumers. This stage of the technology was characterized by optimizing polling APIs, both in the way we gathered data and the way we sent it to consumers. Sure, we got some data via other mechanisms like XMPP as well, but request/response APIs ruled so took up much of our time.

At the same time we offered a more efficient push API to our consumers via WebHooks, but many still wanted to use polling because of its relative ease and certain advantages such as easier firewall navigation. This system, known these days as “the 1.0 system”, was a centralized, clustered system written largely in Java with some Python, Ruby and other languages thrown in.

Back then the market for social media data wasn’t nearly as well defined as it is today. As the marketplace evolved it became clear a change was needed. Publishers wanted to have visibility into who was hitting their APIs, not be shielded by a third party. They had invested in their APIs and could handle the traffic of polling by individual consumers just fine.

Enter the second phase of Gnip’s technology–the appliance model.

Instead of a centralized system gathering data once for all customers, the appliance model accepted the overall inefficiency and had each customer get the data from the source API separately. Built with the well-known technology stack of Ruby, Rails, and MySQL, each customer got its own EC2 instance on which Gnip’s software collected data on their behalf, and only their behalf. No data was shared between customers. Many of the lessons learned in the 1.0 system applied–polling most efficiently and predictably while honoring the publisher’s rate limits and other Terms of Service. Beyond that, other, more efficient techniques of delivering data such as WebHooks and Streaming HTTP were starting to gain traction and included in the appliance implementation.

Nothing stands still though, and the business environment continued to evolve to where publishers saw growing demand for the data for legitimate commercial and other uses. Partnerships were formed, requirements around quality of service got stronger, and volumes continued to grow. We put a lot into the appliances to make them reliable and easy-to-use, but ultimately as a single instance solution they have their limits in capacity. Volumes from the largest publishers were growing beyond them.

So here we are at phase three–a centralized Java system with a healthy dose of Ruby. It’s come full circle in some ways, though with many evolutionary improvements over the 1.0 system. And much larger scale. Some of those more efficient models of consumption are core to this system and allow it to handle far more data. Ruby applications play an important and larger role than in the 1.0 system, so we’ve ended up with a great balance of tools in our toolbox. The appliances continue to serve more moderate volume use cases quite well with unique ease-of-use, insight, and control for the user.

Along the way we’ve learned plenty of lessons and benefited greatly as developers from the writing of others sharing their experiences. So here we’ll share our own in hopes it may help others and aid in our own continued evolution as well.