22 June 2016 Engineering

Just Enough Kafka For The Elastic Stack, Part 2

By Suyog Rao

Welcome to part 2 of our multi-part Apache Kafka and Elastic Stack post. In our previous post, we introduced use cases of Kafka for the Elastic Stack and shared knowledge about designing your system for time based and user based data flow. In this post, we'll focus on the operation aspects: tips for running Kafka and Logstash in production to ingest massive amounts of data.

Capacity Planning

Before we dive deep, a reminder that we are mostly talking about Kafka 0.8, and Logstash 2.x which is the current stable version. There are newer versions of Kafka — 0.9 and recently, 0.10, but the core concepts discussed here can be applied to any Kafka versions. Without further ado, let’s start by discussing the different systems at play here:

Apache ZooKeeper: Kafka has a dependency on ZooKeeper (ZK) — brokers need it to form a cluster, topic configuration is stored in ZK nodes, etc. Plus, in version 2.x of Logstash, the input offsets are stored in ZK as they get acknowledged. Newer versions of Kafka have decoupled the clients — consumers and producers — from having to communicate with ZooKeeper. In Kafka 0.9 and 0.10, offsets are stored in topics by default instead of in ZK. Either way, you still need ZooKeeper to run Kafka brokers. Our general advice is to run 3 ZK instances to achieve a quorum configuration, and all of them on separate hardware. For more information on operationalizing ZK, refer to this excellent section in Kafka docs. From our experience, ZK itself does not need much hand-holding once set up. You just have to make sure the instances are up and are monitored.

Kafka Brokers: Number of Kafka brokers you need typically depends on data retention and replication strategy. The more brokers you add, more data you can store in Kafka. In terms of resources, Kafka is typically IO bound. Performance will be limited by disk speed and file system cache — good SSD drives and file system cache can easily allow millions of messages/sec to be supported per second. You can use topbeat to monitor these information.

Logstash: How many Logstash instances do you need to process the data in Kafka? It is really hard to magically place a number for this, because frankly, it depends on a lot of variables. Questions like: how many filters do you have? How expensive are your filters, needs to be answered. Remember, it is really easy to end up with a complex Grok pattern with multiple conditionals to process your data! What is your volume of data you expect? What are all your outputs? As you see, there's a lot of information we need to gather before providing a number. Often times, it is the outputs (external systems) where you have to focus your capacity planning, not Logstash itself! That being said, you can easily scale Logstash and Elasticsearch horizontally. Therefore, our advice is to start small, and continue to add nodes or new LS instances as your data needs grow.

In particular, for data in Kafka that Logstash consumes, you can group multiple instances into consumer groups. Each group shares the load and instances will handle data exclusively, i.e. messages will be consumed only once by one client in the group. This design lends very cleanly to our original proposition — start small and scale iteratively. Using topics, you can design your workflow such that data which needs more complex transformations, or data that needs to be stored in a slower output is isolated from other fast-moving data. Remember, in Logstash, a single slow output can block all other outputs which are configured to run after it.

Elasticsearch: As we've mentioned before, Elasticsearch is truly elastic in that you can scale up easily. Capacity planning for Elasticsearch is an entire blog post by itself, and beyond the scope of this article. We recommend you read the following posts which cover the concepts of scaling and sizing Elasticsearch -- Sizing Elasticsearch, Performance Considerations for Elasticsearch Indexing, and others.    

Data Retention

If your Kafka instance is running out of disk space, chances are that your retention time for Kafka logs is too high. In Kafka, you can configure data retention based on 2 criteria: age and size, using log.retention.bytes and log.retention.hours broker settings respectively. If either one of these criteria is met, Kafka broker will start deleting messages starting from the oldest, regardless of whether Logstash has consumed it.

It is tempting to design data recovery and retention for Elasticsearch by using Kafka's retention tools. From our experience, it is best to use a tool like Curator to manage Elasticsearch's time-based indexes, in addition to configuring a snapshot strategy for restoring indexes on any catastrophic failures. Most often, data in Kafka is raw, unfiltered content, and has multiple destinations, so it is good to not have it tightly coupled to one downstream component.

Offset Management and Message Delivery Guarantees

From Kafka’s documentation:

The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition. Offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages

Kafka input keeps track of offset information using ZooKeeper. As Logstash pulls messages from the topic and processes it, it periodically commits to ZK. This process is called check-pointing or committing. By default, Logstash check-points to ZK every minute. You can control this frequency by using the auto_commit_interval_ms setting. Be aware that longer times for this setting could lead to data loss in case Logstash is forcefully stopped, or the process crashes. On the other hand, small time value means increase writes per client, which could overwhelm the ZK cluster.

If you restart Logstash, it will first read the offset information stored in ZK and start fetching messages from the previous commit point. Kafka is designed to follow at-least-once semantics — messages are guaranteed to be not lost, but may be redelivered. This means there could be scenarios where Logstash crashes, while the offset is still in memory, and not committed. This can cause messages to be re-delivered, or in other words, duplicated. If this is a concern in your use case, you can workaround the potential duplication by generating/using unique IDs in a field for your messages. Whether you write your own code to create these IDs, or use the uuid filter in Logstash, you will have to do this prior to the messages entering Kafka. On the "shipper" side of Logstash, you can map this event ID to document_id option in the Elasticsearch output plugin. This means Elasticsearch will overwrite the indexed document which has the same ID, which is generally preferred over producing multiple documents with same content!

This is also useful if you ever have to replay content, should you lose data downstream. You can use a different consumer group to replay data at its own pace.

input {
  kafka {
    zk_connect => "kafka:2181"
    group_id => "logstash"
    topic_id => "apache_logs"
    consumer_threads => 16
  }
}
....
output {
  elasticsearch {
    document_id => "%{my_uuid}"
  }
}

Where my_uuid is an existing field in the Event.

Monitoring: How do I know if there is a lag?

One of the important things to monitor when using Kafka is how many messages are backed up and waiting to be consumed by Logstash. There are plenty of tools to monitor this information; below are some options:

CLI tool bundled with Kafka

Simple, command line tool to check offsets. You can run it on a cronjob periodically and alert using your favorite alerting software.

/usr/bin/kafka-consumer-offset-checker --group logstash --topic apache_logs --zookeeper localhost:2181

Sample response:

Group    Topic       Pid Offset  logSize Lag     Owner
logstash apache_logs 0   145833  300000  154167  none
logstash apache_logs 1   145720  300000  154280  none
logstash apache_logs 2   145799  300000  154201  none
logstash apache_logs 3   146267  300000  153733  none

The Lag column tells you how many messages you are lagging by.

JMX

Kafka can be easily monitored via JMX with JConsole. To attach JMX to monitor Logstash, you can set these extra Java options before starting Logstash:

export LS_JAVA_OPTS="
 -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.port=3000
 -Dcom.sun.management.jmxremote.ssl=false
 -Dcom.sun.management.jmxremote.authenticate=false"

If you are running on AWS, don't forget to use the external hostname or IP of the server.

-Djava.rmi.server.hostname=ec2-107-X-X-X.compute-1.amazonaws.com

Elastic Stack

Why yes, you can monitor Kafka by using the Elastic Stack itself. Obviously, its our favorite option! For this specific case, we use the purpose built Beat called Kafkabeat which is written by our very own Dale McDiarmid. This Beat gathers offset and other topic information and stores it in Elasticsearch. You can then analyze consumer lag using Kibana. Coupled with topbeat that captures system level stats like disk throughput, CPU and memory, we have a powerful solution to monitor Kafka. So now you have all the data in one place to convince your boss to replace those old crufty spinning disks with brand new SSDs! Oh, and in 5.0.0, it gets even better. All this critical information -- application and system level monitoring -- gets rolled into one Beat, called Metricbeat. Savvy? 

Ok, back to Kafkabeat for now. Here's how you get started with it:

  1. Clone https://github.com/gingerwizard/kafkabeat
  2. Run make inside kafkabeat directory.
  3. Deploy Kafkabeat to your Kafka brokers and run it by ./kafkabeat -c kafkabeat.yml
This will collect offset information from all Kafka topics for this broker and index into Elasticsearch with the following document structure
"@timestamp": "2016-06-22T01:00:43.033Z",
  "beat": {
    "hostname": "Suyogs-MBP-2",
    "name": "Suyogs-MBP-2"
  },
  "type": "consumer",
  "partition": 0,
  "topic": "apache_logs_test",
  "group": "logstash",
  "offset": 3245
  "lag": 60235
}

Once the data is in Elasticsearch, it is straightforward to visualize it with Kibana. I've used the hot-off-the-press Kibana 5.0.0-alpha3 to create a dashboard to graph consumer lag field over timestamp.

Kibana_Kafka2.png

Kafka Manager

This is an open source UI tool to manage Kafka in its entirety. You can create topics, track metrics, manage offsets and more. Be aware that this tool takes a while to compile and build, but if you need an end-to-end management solution for Kafka you can give this a try! Once you start the Kafka Manager tool, follow instructions to create a new cluster to monitor by pointing it to your ZK instance. 

Consumer View

kafka UI.png

Topic View

Kafka UI 2.png

Conclusion

In this post, we provided tips to operationalize Kafka and Logstash so you can ingest data from multiple sources into Elasticsearch. There's more to come!

Last year, Kafka released 0.9.0 version, and recently 0.10.0 which is packed with new features like inbuilt security, new consumer implementation, data quotas and more. We've updated Logstash input and output so you can use these features with Logstash! In the next post, we'll cover the new features in Kafka, and in particular, end-to-end security using Kafka and the Elastic Stack.

'till next time!