Engineering

Just Enough Kafka for the Elastic Stack, Part 1

The Elastic Stack and Apache Kafka share a tight-knit relationship in the log/event processing realm. A number of companies use Kafka as a transport layer for storing and processing large volumes of data. In many deployments we've seen in the field, Kafka plays an important role of staging data before making its way into Elasticsearch for fast search and analytical capabilities. Through a series of blog posts, we'd like to shine more light on how to set up and manage Kafka when integrating with the Elastic Stack. Specifically, we'll discuss our experiences operating Kafka and Logstash under high volume.

Note: For the purposes of these posts, we refer to Kafka's 0.8.x version. Some of the functionality has changed in Kafka's latest 0.9.x version, but 0.8.x is still very popular and widely used.

The Basics

Let's get some basic concepts out of the way. From Kafka's documentation:

Apache Kafka is publish-subscribe messaging rethought as a distributed commit log

Kafka was created at LinkedIn to handle large volumes of event data. Like many other message brokers, it deals with publisher-consumer and queue semantics by grouping data into topics. As an application, you write to a topic and consume from a topic. An important distinction, or a shift in design with Kafka is that the complexity moves from producer to consumers, and it heavily uses the file system cache. These design decisions, coupled with it being distributed from scratch, makes it a winner in many high volume streaming use cases.

Logstash integrates natively with Kafka using the Java APIs. It provides both input and output plugins so you can read and write to Kafka from Logstash directly. The configuration to get started is pretty simple:

kafka {
zk_connect => "hostname:port"
topic_id => "apache_logs"
...
}


Kafka has a dependency on Apache ZooKeeper, so if you are running Kafka, you'll need access to a ZooKeeper cluster. More on that later.

When To Use Kafka With The Elastic Stack?

Scenario 1: Event Spikes

Log data or event based data rarely have a consistent, predictable volume or flow rates. Consider a scenario where you upgraded an application on a Friday night (why you shouldn't upgrade on a Friday is for a different blog :) ). The app you deployed has a bad bug where information is logged excessively, flooding your logging infrastructure. This spike or a burst of data is fairly common in other multi-tenant use cases as well, for example, in the gaming and e-commerce industries. A message broker like Kafka is used in this scenario to protect Logstash and Elasticsearch from this surge.

In this architecture, processing is typically split into 2 separate stages — the Shipper and Indexer stages. The Logstash instance that receives data from different data sources is called a Shipper as it doesn't do much processing. Its responsibility is to immediately persist data received to a Kafka topic, and hence, its a producer. On the other side, a Logstash instance — a beefier one — will consume data, at its own throttled speed, while performing expensive transformations like Grok, DNS lookup and indexing into Elasticsearch. This instance is called the Indexer.

While Logstash has traditionally been used as the Shipper, we strongly recommend using the suite of Elastic Beats products available as specialized shippers. Filebeat, for example, is a lightweight, resource friendly agent which can follow files and ship to Kafka via a Logstash receiver.

Note: At this time, Filebeat cannot write directly to Kafka, but starting with 5.0.0 (currently in pre-release state), you'll be able to configure Kafka as one of the outputs. This enhancement further simplifies the above architecture in use cases that ingest data using beats. Please try out this and other awesome new features in our alpha releases, and let us know what you think! Word on the street is that you can even win a free pass to Elastic{ON}17 by helping us test these!

Scenario 2: Elasticsearch not reachable

Consider another scenario. You're planning to upgrade your multi-node Elasticsearch cluster from 1.7 to 2.3 which requires a full cluster restart. Or, a situation where Elasticsearch is down for a longer period of time than you expected. If you have a number of data sources streaming into Elasticsearch, and you can't afford to stop the original data sources, a message broker like Kafka could be of help here! If you use the Logstash shipper and indexer architecture with Kafka, you can continue to stream your data from edge nodes and hold them temporarily in Kafka. As and when Elasticsearch comes back up, Logstash will continue where it left off, and help you catch up to the backlog of data. In fact, this bodes well with the Elastic nature of our software — you can temporarily increase your processing and indexing power by adding extra Logstash instances to consume from the same Kafka topic. You could additionally add extra nodes in Elasticsearch as well. Scaling horizontally without too much hand-holding is one of the core features of Elasticsearch. Once you are caught up, you can scale down to your original number of instances.

Anti-pattern: When not to use Kafka with Elastic Stack

Just as it is good to know when to use Kafka, it is also good knowledge to know when not to use it. Everything has a cost — Kafka is yet another piece of software you need to tend to, in your production environment. This involves monitoring, reacting to alerts, upgrading and everything else that comes with running a software successfully in production. You are monitoring all your production software, aren't you?

When it comes to centralized log management, there is often a blanket statement made that logs need to be shipped off your edge nodes as soon as possible. While this may be true for some use cases, ask yourself if this is really a requirement for you! If you can tolerate a relaxed search latency, you can completely skip the use of Kafka. Filebeat, which follows and ships file content from edge nodes, is resilient to log rotations. This means if your application is emitting more logs than Logstash/Elasticsearch can ingest at real time, logs can be rotated — using Log4j or logrotate, for example — across files, but they will still be indexed. Of course, this brings in a separate requirement of having sufficient disk space to house these logs on your server machines. In other words, in this scenario, your local filesystem will become the temporary buffer.

Design Considerations For Kafka And Logstash

Below we describe some design considerations while using Kafka with Logstash. Logstash input uses the high level Kafka consumer API and Logstash Output uses the new producer API.

Topics

Topics are logical grouping of messages. They provide a way to isolate data from other consumers if necessary. Note, in 0.8 version of Kafka, there is no in-built security, so any consumer can access from any topic available on the broker. How many topics you need and how do you model your data, is, well, dependent on your data. Here are some strategies:

User based data flow: In this case you would be creating a topic per user. Please remember that Kafka registers all partitions in ZooKeeper, so there is a cost of creating hundreds and thousands of topics. If your users are small in number — for example, departments — this strategy of partitioning per user works well.

Attributes based data flow: For logging and event driven data, you could also group multiple users in one topic based on attributes like data volume and expected search latency. Remember that more time events spend in the queue while not getting indexed into Elasticsearch, the longer are the latency for searching these. One solution is to create topics based on expected SLAs — “high”, “medium” and “low” topics. Similarly based on data volume. Do you have a customer/user who is known to produce bursty data? Give them a new topic. In a multi-tenant deployment, it's good practice to have a “bursty” topic, so when a user violates their data volume, or produces too much bursty data in the last X minutes/hours, you can move them, at runtime, to this topic. This way you'll keep other topics clear of unnecessary traffic, thereby not slowing everybody down. A good analogy here is an expressway — you mostly want the fast lane to be free flowing, so slower vehicles are expected to move to other lanes. Think of Kafka topics as lanes and events as cars!

In general, separate topics per data source allows for isolation of sources. In Kafka, you can configure number of partitions per topic. This also means you can scale Logstash instances per topic. If you expect certain sources to grow to a higher volume in the future, you can always over-partition to future proof it.

Topics can be created on the fly when data is first published to a non-existent one, or can be manually pre-created.

kafka-topics.sh --zookeeper zk_host:port --create --topic user1
--partitions 8 --replication-factor 1<span></span>


Partitions

Now's a good time to talk about partitions! From Kafka's documentation:

“The partition count controls how many logs the topic will be sharded into. There are several impacts of the partition count. First each partition must fit entirely on a single server. So if you have 20 partitions the full data set (and read and write load) will be handled by no more than 20 servers (no counting replicas). Finally the partition count impacts the maximum parallelism of your consumers.”

In essence, the more partitions you have, the more throughput you get when consuming data. From the producer standpoint, Kafka provides you an option for controlling which data ends up in which partition. By default, when using Logstash, data is assigned to a partition in a round-robin fashion. By specifying the message_key in Logstash config, you can control how your data is assigned to a partition. In some cases it can be efficient to have fewer topic/partitions to workaround the ZooKeeper limitation, but group multiple users under fixed partitions by using user_id as message_key. If key is specified, a partition will be chosen using a hash of the key.

Another important property to be aware in Kafka is the order of messages. Kafka only guarantees message order within the same partition. So if messages from data source has no key, it gets sprayed across partitions, and Kafka will not guarantee ordering when consuming. Where data is immutable, and in particular, for logging use cases, this can be an acceptable property. If you need strong ordering, make sure that data is pinned to a single partition.

Consumer Groups: Scalability And Fault Tolerance

Multiple Kafka consumers which process data from similar topics form a consumer group designated by unique name in the cluster. Messages published to Kafka are distributed across instances in the group, but each message is handled by just one consumer in the group, i.e. there is no overlap. Logstash instances reading from Kafka form a consumer group with a default group ID called logstash. You can spin up new Logstash instances at any time to scale read throughput for the subscribed topic. By default, the new Logstash instance started will join the logstash consumer group. This process — when a new consumer joins a consumer group — triggers a rebalancing in Kafka. we mentioned before that Logstash uses the high level Kafka consumer, so it delegates rebalancing logic to the Kafka library. This process automatically reassigns the partitions to current consumers based on metadata available in Zookeeper. Another reason to use multiple Logstash instances is to add fault tolerance. If one instance goes down, Kafka goes through rebalancing process and distributes assignments to existing Logstash instances.

All this closely relates to consumer_threads setting in Logstash input. This setting controls the number of threads consuming from Kafka partitions. Ideally you should have as many threads as the number of partitions for a perfect balance — more threads than partitions means that some threads will not have anything do. Fewer threads than partition means some threads are consuming from more than one partition.

Consider a scenario where topic apache_logs that has 16 partitions. I could spin up one Logstash instance on an 8 core machine with this configuration:

input {
kafka {
zk_connect => "kafka:2181"
group_id => "logstash"
topic_id => "apache_logs"
}
}


Or we could spin up 2 Logstash instances on 2 machines with consumer_threads set to 8 each. The latter deployment is a better choice — it fully utilizes the machine's CPU, but also adds fault tolerance in case of catastrophic failures.

Usually, try to ensure that the number of partitions is a multiple of the number of Logstash threads/instances. This ensures instances are balanced. Partitioning, similar to Elasticsearch sharding, means we can add more processing capacity (Logstash instances) later.

Serialization Formats

Kafka persists messages using byte arrays in its queue. So you can pretty much throw any format at Kafka, but in general its is recommended to use a serialization format which is compact and fast. Kafka has a way to deal with serialized message formats by specifying the value_serializer in outputs, and decoder_class in inputs. If you are a savvy Logstash user, you must surely be thinking about codecs by now. It is both possible to leverage Logstash codecs as well as Kafka serializers to manage message representation into and out of Kafka topics.

Other Logstash codecs that are relevant to the Kafka ecosystem are plain, avro, and avro_schema_registry.

If you wish to write your own serializer/deserializer you can do so in your favorite JVM language. Since these classes are not in Logstash's classpath, you must explicitly add the appropriate library into your java classpath

export CLASSPATH=\$CLASSPATH:/path/to/kafkaserializers.jar; bin/logstash -f ..


Please note that decoding of messages in Logstash input is single threaded, so an expensive serialization format like json will decrease the overall performance of the pipeline.

Conclusion

In this post, we've covered few basic concepts in Kafka and presented its use with the Elastic Stack. In the next post, we'll jump right into the operational aspects, and provide tips for running Kafka with Logstash. Meanwhile, if you have any questions, feel free to reach us on our forum or on twitter!

Update: Want more? Read part 2 of this series for operational and monitoring tips.