Finding Bugs at High Speed: Kafka Thread Safety

Whenever you are adding a new component to a system there are going to be integration pains. What do all these log lines mean? Is this an error I have to worry about, or is it part of normal operation? Sometimes these questions take a while to answer, particularly if the tools you are using are chatty.

Recently, we added a Kafka, a distributed messaging system, to our architecture. Kafka lets us focus on what we’re good at, streaming large numbers of activities, very quickly, while opening up our architecture to a number of scalability improvements in the future.

When we first started experimenting with Kafka, we had a number of problems with it. Its logging was fairly chatty. Sometimes it would even log stack traces under normal operation. This made it hard to figure out whether a particular log message was an error or normal. Also, we had some configuration options set too small–some timeouts were set too short, which meant that the client’s connections to the Kafka server were getting interrupted regularly.

One stack trace stood out though a null pointer exception from inside the Scala JSON parser (stacktrace).

I searched for the first line in the stack trace and found this issue in Scala’s issue tracker (issues.scala-lang.org/browse/SI-4929). It turned out that Scala’s standard library JSON parser wasn’t thread safe. It has been patched, but not for the version of Scala we’re using with Kafka.

Looking further up the call chain, I saw that JSON parser was being called from within Kafka’s client code. Specifically, it was being called when the consumers were being rebalanced. Kafka rebalances consumers for a topic whenever either a consumer is added to the topic or it’s session with ZooKeeper times out. We had our timeouts set too low, so that was happening often. In addition, we were creating multiple consumers within the same JVM. The end result was that we were parsing JSON using multiple threads at around the same time with a thread unsafe parser and doing it fairly frequently. This was a recipe for a concurrency problem.

We already had forked Kafka to make some configuration changes, so I worked up a patch for our fork. I changed Kafka’s consumer code so it would create a new JSON parser each time it asked for topic information from ZooKeeper. With that change, the separate threads were no longer using a shared parser.

Creating new parsers every time Kafka needed data from ZooKeeper added a small amount of overhead in exchange for thread safety. Since rebalancing should happen infrequently, adding that overhead didn’t have much effect on performance. That was especially true after we reconfigured the timeouts with ZooKeeper so we stopped unbalancing unnecessarily.

To wrap things up, I filed a ticket on Kafka’s issue tracker with my patch (issues.apache.org/jira/browse/KAFKA-379). A fix was put in and now new releases of Kafka will not have thread safety issues with JSON parsing.