Custom Event Batching with Apache Kafka

Kafka provides a fair amount of configuration and pluggability out of the box. With configuration settings, you can control many features on the producer, broker, and consumer. Beyond simple settings, further customization can be found in several pluggable interfaces.

Looking at the producer side in particular, control over things like synchronous vs. asynchronous behavior, flush times, compression, etc. are easily controlled with configuration settings. Taking the next step, several simple interfaces provide customizability around how generic types are converted to Kafka messages before sending to the broker. A callback interface (kafka.producer.async.CallbackHandler) also allows for recording or customizing data as it goes through the various stages of the AsyncProducer.

If you need still more control and are willing to take more responsibility, the kafka.producer.async.IEventHandler interface provides the hook-point. Implementing this interface is a bit lower level as you’re dealing with batching and sending to the broker yourself. Let’s take a look at an example to see what’s involved.

The standard AsyncProducer does a good job of batching the messages that have been queued up by topic and broker, but the result is a single kafka.message.Message for each item you’ve queued to the producer. Suppose instead you want to batch all messages for a particular topic/broker into a single Message. In this example we’ll simply concatenate the original string messages into a comma-delimited string that goes to the broker.

The interface to implement, assuming you’re doing this in Java, is kafka.javaapi.producer.async.EventHandler<T>. The primary method of interest is:

public void handle(List<QueueItem<String>> items, SyncProducer producer, Encoder<String> encoder)

This method is called for each batch and is responsible for getting the QueueItems sent to the broker. We see that the underlying SyncProducer is passed in so that should be easy enough. Note that the String type parameter for QueueItem will vary based on the type your application is posting to the producer.

Here is what an implementation of that might look like:

Simple enough, but the interesting behavior is in the helper methods. First off, we have a simple value class Destination that helps us map topics to partitions on the broker:

class Destination {
    private final String topic;
    private final int partition;

    public Destination(int partition, String topic) {
        this.partition = partition;
        this.topic = topic;
    }

    public String getTopic() {
        return topic;
    }

    public int getPartition() {
        return partition;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        Destination that = (Destination) o;

        if (partition != that.partition) return false;
        if (!topic.equals(that.topic)) return false;

        return true;
    }

    @Override
    public int hashCode() {
        int result = topic.hashCode();
        result = 31 * result + partition;
        return result;
    }
}

Next is our helper that groups each of the QueueItems by the destination to which it is being sent:

Again, nothing too surprising here. Now that we have them grouped by destination, this is where we’ll deviate from the standard behavior. The createRequests method is responsible for creating the actual requests that go to the broker (through the SyncProducer).

Notice the granularity of the original messages as they relate to a producer request and Kafka message. Multiple QueueItems are put into a single Kafka message as seen in the getKafkaMessage() method that concatenates all of the original messages for a destination into a single comma-delimited string:

While this method returns a List<Message> this is merely a convenience for the caller–it will only ever add the single message. Also, note that we get the Encoder and the ProducerConfig in the callback which allows us to honor those settings.

Finally, the EventHandler is passed to the Producer when it’s constructed either through a direct constructor call or by setting the “event.handler” producer property.

 As is the case with examples, this one does not implement all functionality and does little error-checking in the interest of clarity. The standard behavior has additional functionality such as compression on a per-topic basis, so you’ll want to pay attention to that if you’re taking advantage of those features.

 

 

The Quick Left Rube Goldberg Hackfest

Rube Goldberg Hackfest

Do you like to geek out? like to code? to tinker? to hack? Ever wondered what it’s like to wrangle the Twitter firehose? Like to build overly complicated and interesting machines that perform a simple task? Of course you do! The Quick Left Rube Goldberg Hackfest on March 27th is your chance to dabble in a little of all of that!

We are very excited to be opening up some real, live, Twitter (and possibly other publishers) streams for participants to develop against. Our product streams are consumed daily by social media monitoring firms, Fortune 500 companies and the like to perform important business analysis. But who cares about that? This will be your opportunity to dream up a ridiculous use  for them in your own recipe to hack together a goofy, complicated and most importantly, fun, machine.

Gnip will be co-sponsoring the hackfest along side ATOMS Express, Mobiplug, and KST.  Whether you want to dive in headfirst and play with all of these toys to your heart’s content (well, just 3 hours, and please don’t break them) or you just want to come hang out and watch the madness ensue, all are welcome. There will be beer and certainly some snackage. Participants, make sure to register for the event beforehand! See you there!

 

The Eclectic Art Of System Monitoring

The Problem

At Gnip, we deploy our software to hundreds of servers across multiple data centers and heterogeneous environments. Just architecting the software itself and how to create a streamlined deployment process on this scale is in itself worth a full blog post. In this post though I would like to focus on the aspect of how to make sure that your full application stack is healthy all the time.

The Solution?

On the mechanical side the solution is very straight forward: You deploy a system monitoring tool that watches continuously all servers and all applications that will send you friendly messages when something does not look right. There are plenty of tools available for this purpose. We ourselves use Nagios, which has become to some extend an industry standard.

That was easy, right?

Sadly, the first sentence in this paragraph leaves a lot of details out, and these details can kill your application stack and your engineering team at the same time. (Either by not finding or problems, or by keeping your engineers up all night.) In this post, I’d like to share how we at Gnip go about keeping software, hardware, and people-ware happy.

Watching All Applications

It is certainly not enough to just ensure that an application is running. In order to get meaningful insight, you have to add health checks that are particular to the application itself. For instance, in an application that consumes a continuous stream of data, you can be fairly certain that there is a problem if the transaction volume drops below a certain rate. Nagios allows quite easy integration with these kind of application specific health checks. We ourselves decided to expose an HTTP endpoint on our applications that will expose a digest of all health checks registered for an application. This allows the Nagios check to be uniform across all applications.

TIP: Standardize health APIs

Sending Friendly Messages

We configured our system so that certain types of alerts in our production system will cause text messages to be sent to the engineer who is on call. All engineers participate in this on call rotation. This process ensures a large sense of ownership, because it does not take many pages in the middle of the night to change an engineers’ attitude towards writing reliable and scalable code. (If it doesn’t bother the engineer herself, the grumpy spouse will have a strong opinion, too.)

TIP: Every engineer participates in on-call rotation

We also have a set of health checks that will send out emails instead of pages. We use this for situations where it appears that no immediate action can or has to be taken.

TIP: Categorize actionable and non-actionable alerts

What Does Healthy Mean Anyway?

In this setup it becomes quickly apparent that finding the right thresholds for triggering alerts is non trivial. Due to inconsistent inbound data volumes, it is easy to set thresholds that are too aggressive, which leads to a flood of meaningless noise drowning out critical failures. The opposite is to set thresholds that are too casual, and you’ll never find out about real problems. So, instead of using your gut instinct for setting thresholds, mine data like log files and metrics to figure out what historically would be appropriate thresholds.

TIP: Mine metrics for proper thresholds

Self Healing

As it turns out it happens that you get periodically the same alert for the same reason and the fix is to manually step through some actions that usually fix the problem. At that point, the obvious question is why not to automate the fix and trigger it directly when the problem is detected by the health check. The tendency can be for engineers to postpone the automation effort, partially because it is not interesting feature work. Additionally, it can be tremendously tedious to automate the manual steps, since they frequently involve tools that application developers are sometimes not exposed to. (cron, bash, etc.)

Sometimes you might have to spend a couple of hours automating something that takes  one minute to do manually. The importance of reducing context switches from feature work can not be stressed enough and you the investment will always pay off.

TIP: Application developers learn a new tool every week!

Conclusion

One can categorize issues in a production system by assessing whether an issue raised an alert, whether the issues requires manual intervention, and whether the issue is an actual problem or just a reporting flaw. The following venn diagram illustrates a situation that leads to unhappy customers and tired engineers:

So, for a smoothly running engineering organization you clearly want to focus your energy on getting to a better state.

TIP: Get to this state:

Chef, EC2 Auto Scaling, and Spot Instances for Fun and Profit

If you have a problem with dynamic CPU and or I/O requirements that is easily distributed across homogenous nodes, EC2 Auto Scaling can be a great tool to add to your stack.  If you can design your application in a way that can gracefully handle nodes terminating while processing work, Amazon EC2 Spot Instances can provide a great cost savings.  We’ve used these technologies in conjunction with Resque with great success.  This model would likely fit other job management frameworks such as Sidekiq or Gearman.

If you’re in your own datacenter you can achieve similar functionality with Eucalyptus or OpenStack/Heat, but we’ll focus on using EC2 Auto Scaling as a concrete example for this blog post.  By the end of this post, you should have a good idea of how to set up your own EC2 Auto Scaling cluster using Chef and Spot instances.

Create a Package That Installs and Runs Chef
You’ll want to create a package (rpm, deb, etc..) that contains definitions of all the software necessary to start “doing work” on your newly provisioned worker node.  We managed to achieve this by creating an RPM that includes our chef cookbooks using the process outlined in this previous blog post.  In the next step we’ll create an AMI that will install these cookbooks via yum when our node boots up and then run chef to turn our base server into a fully functional worker node.

Create Base AMI
We subscribe to the principle that you should have a small number of Amazon Machine Images (AMIs) that have only the minimal amount of information to bootstrap themselves.  This information is typically just:

1.  Which environment am I currently in? (review, staging, or prod)
2.  What type of node am I?  (worker node, web server, etc..)
3.  Where can I download the necessary cookbooks to provision myself? (configure yum or deb sources)

Following this principle, we have created an AMI that runs a “firstboot.sh” init.d script.  This first boot script will configure the node to look at the appropriate yum repo and install the RPM we created in the previous step.  This way the AMI can remain relatively static and you can iterate on your bootstrap code without having to follow the cumbersome process of creating a new AMI each time.  After the cookbooks have been pulled down to the local filesystem, our first boot script will run chef solo to install the necessary software to start “doing work”.

In order to create your own AMI, you’ll need to:

1.  Boot a new EC2 Server.
2.  Install Amazon EC2 Command Line Tools
3.  Customize your server to run your own “first boot” script that will install and run chef-solo.
4.  Run the following command to bundle your server to a compressed image in the /tmp directory:

ec2-bundle-vol -k /path/to/aws-priv-key.pem \
-c /path/to/aws-cert.pem -u <aws-account-id>

5.  Upload your bundled image to s3

ec2-upload-bundle -b <s3_path>-m /tmp/image.manifest.xml \
-a <access_key_id> -s <secret_key>

6.  Register your newly created AMI with your account

ec2-register <s3_path>/image.manifest.xml \
-K /path/to/aws-priv-key.pem -C /path/to/aws-cert.pem

Create Auto Scaling Group
Now that we have an AMI that will start performing work upon boot, we can leverage Amazon’s EC2 Auto Scaling to start going wide.  The idea is that you define a cluster of machines that use the AMI we just created and you define how many servers you want up at any given time.  If you want to spin up 50 servers, you simply set the “DesiredCapacity” for you group to 50 and within minutes you will have 50 fresh new worker nodes.  There are two discrete steps needed to make this happen.  Let’s illustrate how to do this with Fog:

Create Launch Config

as = Fog::AWS::AutoScaling.new(:aws_access_key_id => access_key_id, 
                               :aws_secret_access_key => access_secret_key)
as.create_launch_configuration(<ami_id>, 
                               <machine_type>, 
                               <launch_config_name>, 
                               "SecurityGroups" => <security_groups>, 
                               "KeyName" => <aws_key_pair_name>, 
                               "SpotPrice" => <spot_bid_price>)

This will create a launch configuration that we will use to define our Auto Scaling group.

Create Auto Scaling Group

as.create_auto_scaling_group(<auto_scaling_group_name>, 
                             <availability_zones>, 
                             <launch_config_name>, 
                             <max_size>, <min_size>, 
                             "DesiredCapacity" => <number_of_instances>)

This will create an Auto Scaling Group and will spin up <number_of_instances> servers using the AMI defined in our launch configuration above.

Note that one of the parameters we’ve passed to our Launch Configuration is “SpotPrice”.  This allows you to leverage Amazon’s Spot Instances.  The idea is that you will pay whatever the “market rate” for the given machine_type you’re provisioning.  If the market rate elevates above your SpotPrice, instances in your cluster will begin to terminate.  Your application should be tolerant of these failures.  If this is a mission critical application, you should likely create a “backup” Auto Scaling group without the SpotPrice parameter.  This means you will pay whatever the On-Demand price for your given machine_type, but will allow you to continue to process work when resources are sparse.

Grow / Shrink Auto Scaling Group

Depending on your application, you’ll likely want to grow and shrink your Auto Scaling group depending on how much work needs to be done.  This is as simple as the following API call:

as.set_desired_capacity(<auto_scaling_group_name>, 
                        <number_of_instances>)

The ability to easily spin up 500 worker nodes with one API call can be a very powerful thing, especially when dealing with the amount of data we deal with at Gnip.

Future improvements to this process include wiring this process into Chef Server for easier centralized config management across datacenters.  If these sort of challenges sound interesting, be sure to check out our job postings, we’d love to talk to you!

Understanding the Kafka Async Producer

We’ve been talking recently about our use of Apache Kafka–a system that takes a different view of implementing messaging. Because we’re using Kafka in a high-volume area of our system, we’ve had to spend time understanding how to get messages through reliably at high rates. While we’ve spent time on both the producing and consuming side of the brokers, this post focuses on the producing side only.

At a high level Kafka offers two types of producers–synchronous and asynchronous. As a user of the producer API, you’re always interacting with the kafka.producer.Producer class, but it’s doing a fair bit for you under-the-covers depending on how you’ve configured it.

By default the Producer uses SyncProducers underneath. SyncProducer does what it sounds like–it sends the messages to the broker synchronously on the thread that calls it. In most high volume environments this obviously isn’t going to suffice. It’s too many network sends on too many small-ish messages.

By setting the producer.type flag to ‘async’, the Producer class uses AsyncProducers under the covers. AsyncProducer offers the capability not only to do the sends on a separate thread, but to batch multiple messages together into a single send. Both characteristics are generally desirable–isolating network I/O from the threads doing computation and reducing the number of network messages into a smaller number of larger sends. Introductory information on the producers can be found here.

An important note is that Kafka supports compressing messages transparently such that neither your producer or consumer code has to do anything special. When configured for compression (via the compression.codec producer config setting) Kafka automatically compresses the messages before sending them to the broker. Importantly, this compression happens on the same thread that the send occurs.

Because the compress and send are both relatively time consuming operations, it’s an obvious place you may want to parallelize. But how parallel is the Producer?

A producer configured for async mode creates a separate underlying AsyncProducer instance for each broker via its internal ProducerPool class. Each AsyncProducer has its own associated background thread (named ProducerSendThread) that does the work for the send. In other words, there’s one thread per broker so your resulting parallelism for compressing and sending messages is based on the number of brokers in the cluster.

Here’s a snippet of the threads in an application with a single broker:

AsyncProducer

And here’s what they look like under the same producer load with 3 brokers in the cluster:

AsyncProducer

In the first image you can see that the single thread is doing more work than when it’s spread out, as you would expect.

A common question is “Why am I seeing QueueFullExceptions from the producer”. This often means the ProducerSendThreads are saturated and unable to keep up with the rate of messages going into the Producer. An obvious approach to deal with this is adding brokers to the cluster and thus spreading the message sends across more threads, as well as lightening the load on each broker.

Adding brokers to the cluster takes more consideration than just how efficient the producers are. Clearly there are other approaches you could take such as building a different async model on top of the SyncProducer that’s not dependent on the number of brokers. But it’s useful to know what the options are, and in the end adding brokers may be useful for your architecture in other ways.

If the topology of your producer applications is such that there are many applications producing a relatively small number of messages then none of this may be of concern. But if you have a relatively small number of producer applications sending many large messages, understanding how to spread out that work can be of real importance.

Tracking Cluster Health With Redis

Historical PowerTrack Background

Our new Historical PowerTrack for Twitter allows you to run filtered searches across the entire Twitter archive. At its heart, it is a cluster of machines where each machine runs a handful of Resque workers that do all of the heavy lifting of processing and filtering the data.

Like any good production application, we have an abundant amount of health reporting in place to monitor problems and quickly respond to issues.

Resque itself runs on top of Redis, an open source data structure server. Since we already have our Redis infrastructure set up to power Resque, we can leverage Redis to track our workers health information.

One of the things we track per worker is an “error rate”.  The error rate is the number of errors seen in the last 30 minutes. Once that climbs above a threshold, we then send an alert so that we can examine why that worker is having issues.

Implementation 0

Using Resque Job Hooks a worker is notified when it fails to process a job.

module HealthChecks
  def on_failure_record_health_status
    # Redis code will go here
  end
end

Inside that callback, we then simply create a new key to represent the failure:

setex gnip:health:worker:<worker_id>:error_instance:<unix_timestamp>, 30*60, ‘’

We also give the key an expiry of 30 minutes when we create it.

Then in our health checking code we can simply get the count of error_instances for a given worker_id:

keys gnip:health:worker:<worker_id>:error_instance:*

Since an error instance key dies after 30 minutes, the size of the resulting set from `KEYS` will tell us the error rate! Huzzah!

While this solution works, and was quick to get up and running, there was a catch; namely this tidbit from the Redis documentation on the KEYS command:

“Warning: consider KEYS as a command that should only be used in production environments with extreme care. It may ruin performance when it is executed against large databases. This command is intended for debugging and special operations, such as changing your keyspace layout. Don’t use KEYS in your regular application code. If you’re looking for a way to find keys in a subset of your keyspace, consider using sets.”

Our first implementation is workable, but not very performant or safe. Luckily the docs give us a clue in the warning message: sets.

Current Implementation

Redis describes itself as a data structure server. This means that the value for a given key can be a basic string or number, but can also be a Hash, List, Set, or Sorted Set.

What we’re interested in are Sorted Sets. They work just like normal Sets: you add members, and they must be unique (just like mathematical sets), you can union, intersect, and difference them, however each member also has an associated non-unique score. Redis will automatically keep the set sorted for you by the score.

Sorted Sets have an associated command that is crucial for the the current implementation:

ZCOUNT key min max – Returns the number of elements in the sorted set at key with a score between min and max.

ZCOUNT will return the count of elements in the range of a *score*, not the value of the members. This is pretty efficient too, since Redis is already holding that set sorted by score.

The current implementation now works like this:

Every time an error occurs, we record the Unix timestamp (seconds since Jan 1st 1970).

We then add a new member to the error rate set for a worker, where both the member and the score is the Unix timestamp. We also reset the expiration for the key to 30 minutes (less things to manually clean up later).

zadd gnip:health:worker:<worker_id>:error_instances unix_ts, unix_ts
expire gnip:health:worker:<worker_id>:error_instances 30*60

To calculate the error rate for a worker we can then do:

zcount gnip:health:worker:<worker_id>:error_instances, unix_timestamp_30_seconds_ago, unix_timestamp_now

Because our score is a timestamp, old errors “fall out” of our view when calculating the error rate. Then after 30 minutes of no errors, the key disappears.

Redis commands are also very forgiving of state, so a ZCOUNT against a non-existent key is 0, which is usually going to be the happy path.

What’s really great is that instead of one worker generating multiple keys to represent errors, we now only create one key for each work to represent the error rate.

Other Considerations

There are a few other ways we could have implemented this feature. We could have used milliseconds, but in our app it’s highly unlikely that multiple errors would be thrown in a second.

We could also store the exception message (or even full stack trace) as the member in the set instead of the timestamp again (perhaps prefixed with timestamp to make it unique), and then used ZRANGEBYSCORE just like we used ZCOUNT to get the list of errors in the last 30 minutes.

While this (and other health checks) were written while developing our product, they are not specific to it. Would you, dear Reader, find it helpful if Resque tracked things like error rate, sequential failures, etc per worker (either in resque core, or a add-on gem)?

To wrap up, Redis has a bunch of great commands and data types to solve real problems quickly and easily! Consider this pattern the next time you’re looking to keep track of the rate of events in a given time window.

Finding Bugs at High Speed: Kafka Thread Safety

Whenever you are adding a new component to a system there are going to be integration pains. What do all these log lines mean? Is this an error I have to worry about, or is it part of normal operation? Sometimes these questions take a while to answer, particularly if the tools you are using are chatty.

Recently, we added a Kafka, a distributed messaging system, to our architecture. Kafka lets us focus on what we’re good at, streaming large numbers of activities, very quickly, while opening up our architecture to a number of scalability improvements in the future.

When we first started experimenting with Kafka, we had a number of problems with it. Its logging was fairly chatty. Sometimes it would even log stack traces under normal operation. This made it hard to figure out whether a particular log message was an error or normal. Also, we had some configuration options set too small–some timeouts were set too short, which meant that the client’s connections to the Kafka server were getting interrupted regularly.

One stack trace stood out though a null pointer exception from inside the Scala JSON parser (stacktrace).

I searched for the first line in the stack trace and found this issue in Scala’s issue tracker (issues.scala-lang.org/browse/SI-4929). It turned out that Scala’s standard library JSON parser wasn’t thread safe. It has been patched, but not for the version of Scala we’re using with Kafka.

Looking further up the call chain, I saw that JSON parser was being called from within Kafka’s client code. Specifically, it was being called when the consumers were being rebalanced. Kafka rebalances consumers for a topic whenever either a consumer is added to the topic or it’s session with ZooKeeper times out. We had our timeouts set too low, so that was happening often. In addition, we were creating multiple consumers within the same JVM. The end result was that we were parsing JSON using multiple threads at around the same time with a thread unsafe parser and doing it fairly frequently. This was a recipe for a concurrency problem.

We already had forked Kafka to make some configuration changes, so I worked up a patch for our fork. I changed Kafka’s consumer code so it would create a new JSON parser each time it asked for topic information from ZooKeeper. With that change, the separate threads were no longer using a shared parser.

Creating new parsers every time Kafka needed data from ZooKeeper added a small amount of overhead in exchange for thread safety. Since rebalancing should happen infrequently, adding that overhead didn’t have much effect on performance. That was especially true after we reconfigured the timeouts with ZooKeeper so we stopped unbalancing unnecessarily.

To wrap things up, I filed a ticket on Kafka’s issue tracker with my patch (issues.apache.org/jira/browse/KAFKA-379). A fix was put in and now new releases of Kafka will not have thread safety issues with JSON parsing.

Application Deployment at Gnip

Managing your application code on more than 500 servers is a non-trivial task. One of the tenets we’ve held onto closely as an engineering team at Gnip is “owning your code all the way to the metal”. In order to promote this sense of ownership, we try to keep a clean and simple deployment process.

To illustrate our application deployment process, let’s assume that we’re checking in a new feature to our favorite Gnip application, the flingerator. We will also assume that we have a fully provisioned server that is already running an older version of our code (I’ll save provisioning / bootstrapping servers for another blog post). The process is as follows:

1. Commit: git commit -am “er/ch: checking in my super awesome new feature”
2. Build: One of our numerous cruisecontrol.rb servers picks up the changeset from git and uses maven to build an RPM.
3. Promote: After the build completes, run cap flingerator:promote -S environment=review.
4. Deploy: cap flingerator:roll -S environment=review

Let me break down what is happening at each step of the process:

Commit
Every developer commits or merges their feature branch into master. Every piece of code that lands on master is code reviewed by the developer who wrote the code and at least one other developer. The commit message includes the initials of the developer who wrote the feature as well as the person who code reviewed it. After the commit is made, the master branch is pushed up to github.

Build
After the commit lands on master, our build server (cruisecontrol.rb) uses maven to run automated tests, build jars, and create RPM(s). After the RPM is created, cruisecontrol.rb then copies said RPM to our yum repo server into the “build” directory. Although the build is copied to our yum repo server, it is not ready for deployment just yet.

Promote
After cruise.rb has successfully transferred the RPM to the yum server’s “build” directory, the developer can promote the new code into a particular environment by running the following capistrano command: cap flingerator:promote -S environment=review. This command uses capistrano to ssh to the yum repo server and creates a symlink from the “build” directory to the review (or staging or prod) environment directory. This action makes said RPM available to install on any server in a particular environment via yum.

Deploy
Now that the RPM has been promoted, it is now available via the gnip yum repo. It is now up to the dev to run another capistrano command to deploy the code: cap flingerator:roll -S environment=review. This command ssh’es to each flingerator server and runs “yum update flingerator”. This installs the new code onto the filesystem. After successful completion of the “yum install” the application process is restarted and the new code is running.

This process uses proven technologies to create a stable and repeatable deployment process, which is extremely important in order to provide an enterprise grade customer experience.

Welcome to Gnip’s Engineering Blog

One thing you learn in a startup is to never get too tied to where you are today. You’re trying to find a new market or serve an existing one in a new way. Not only is the business bound to change over time, but the techonology as well. As an introduction to our Engineering blog, here’s a brief history of the evolution of Gnip’s technology.

When Gnip first started it set out to “solve the polling problem”. The world of public web APIs was younger than it is now, social media itself younger still, and the business of social media data barely in its infancy. The vast, vast majority of infrastructure was built around HTTP polling APIs. The amount of data available wasn’t nearly as large as it is today (to make a monumental understatement), so the polling APIs were generally adequate for most uses.

While relatively straight-forward to implement and consume, polling APIs have obvious inefficiencies on both ends. Gnip’s first system (well, technically second) was largely built to turn polling APIs into push APIs. It gathered data through various means like polling, sent the data through a centralized pipeline, finally filtering and fanning out the data to all consumers. This stage of the technology was characterized by optimizing polling APIs, both in the way we gathered data and the way we sent it to consumers. Sure, we got some data via other mechanisms like XMPP as well, but request/response APIs ruled so took up much of our time.

At the same time we offered a more efficient push API to our consumers via WebHooks, but many still wanted to use polling because of its relative ease and certain advantages such as easier firewall navigation. This system, known these days as “the 1.0 system”, was a centralized, clustered system written largely in Java with some Python, Ruby and other languages thrown in.

Back then the market for social media data wasn’t nearly as well defined as it is today. As the marketplace evolved it became clear a change was needed. Publishers wanted to have visibility into who was hitting their APIs, not be shielded by a third party. They had invested in their APIs and could handle the traffic of polling by individual consumers just fine.

Enter the second phase of Gnip’s technology–the appliance model.

Instead of a centralized system gathering data once for all customers, the appliance model accepted the overall inefficiency and had each customer get the data from the source API separately. Built with the well-known technology stack of Ruby, Rails, and MySQL, each customer got its own EC2 instance on which Gnip’s software collected data on their behalf, and only their behalf. No data was shared between customers. Many of the lessons learned in the 1.0 system applied–polling most efficiently and predictably while honoring the publisher’s rate limits and other Terms of Service. Beyond that, other, more efficient techniques of delivering data such as WebHooks and Streaming HTTP were starting to gain traction and included in the appliance implementation.

Nothing stands still though, and the business environment continued to evolve to where publishers saw growing demand for the data for legitimate commercial and other uses. Partnerships were formed, requirements around quality of service got stronger, and volumes continued to grow. We put a lot into the appliances to make them reliable and easy-to-use, but ultimately as a single instance solution they have their limits in capacity. Volumes from the largest publishers were growing beyond them.

So here we are at phase three–a centralized Java system with a healthy dose of Ruby. It’s come full circle in some ways, though with many evolutionary improvements over the 1.0 system. And much larger scale. Some of those more efficient models of consumption are core to this system and allow it to handle far more data. Ruby applications play an important and larger role than in the 1.0 system, so we’ve ended up with a great balance of tools in our toolbox. The appliances continue to serve more moderate volume use cases quite well with unique ease-of-use, insight, and control for the user.

Along the way we’ve learned plenty of lessons and benefited greatly as developers from the writing of others sharing their experiences. So here we’ll share our own in hopes it may help others and aid in our own continued evolution as well.