Kafka is an open-source, distributed event streaming and queuing platform widely used with Elastic to build high-throughput, large-scale data pipelines, facilitate seamless data integration, and support mission-critical applications. System designs with Kafka significantly enable the decoupling of components within the data pipeline ensuring scalability and a robust design for failure by managing downstream back-pressure during traffic surges, maintenance activities, or any other periods of performance degradation.
In addition to its queuing capabilities, Kafka can serve as a central processing middleware for data pre-processing and enrichment. This is particularly useful when such operations are impractical to perform directly downstream due to specific business or technical requirements or constraints.
For instance, integrating Kafka with stream processing engines like KsqlDB or Materialize, allows for advanced stream processing tasks, including SQL-based joins across topics and streams to enrich data at scale in real-time. The enriched datasets can then be ingested into Elasticsearch for further processing at subsequent stages.
Despite these benefits, adopting Kafka or similar queuing systems is arguably conditional. These systems introduce additional costs and complexity to the overall platform implementation and maintenance. They may also add processing overhead, delay data flow to the downstream, and risk becoming bottlenecks if not correctly sized or optimized to align with other pipeline components.
This article provides guidance for troubleshooting ingestion bottlenecks in data pipelines built with Kafka and Elastic. Identifying and fixing such issues can be sometimes challenging, particularly when multiple changes are made across multiple systems aspects at the same time, which often increases the number of variables in play. This commonly results in a longer process and inconsistent results.
Consider the below Security Operations Center (SOC) platform, where data is ingested from various sources via Elastic Agent. The data is queued and pre-processed in a Kafka cluster before being pulled by Logstash and forwarded to Elastic Security. In this environment, delays at any stage of the pipeline can result in critical security events going undetected by Elastic Security, emphasizing the importance of a well-optimized data pipeline.
Implement lag and throughput monitoring
Ingestion bottlenecks usually materialize as limited throughput and event lags, which often correlate. Monitoring these two indicators is important to measure the impact of tuning attempts.
Tip: With the anomaly detection features of machine learning you can use the Logs Anomalies page to detect and inspect log anomalies and the log partitions where the log anomalies occur.
End-to-end lag monitoring can be broken down into the various stages of the pipeline. The incremental improvements across those stages would collectively contribute to a significant reduction in the end-to-end lag:
A) Ingest lag between the source and Kafka: This lag is the time difference between the real event-time, which is typically extracted from the event itself or added by the event producer (Elastic Agent for example), and the Kafka record timestamp, which can be added to the Logstash events via event decoration in the Kafka input plugin.
In most cases, this lag is influenced by the write performance of the Kafka cluster and network latency between the event source and Kafka. In some cases, the lag may also appear due to time configuration mismatches that make it look like there's a lag when there really isn't.
B) Ingest lag between Kafka and Logstash: This lag is the time difference between the Kafka record's timestamp and the execution timestamp of the first filter in the Logstash pipeline. If your pipelines are using a persistent queue, note that this duration also includes the time spent in the PQ.
The below Ruby filter adds the current-time to the event in the `logstash.start` field to use for comparison later.
ruby {
code => "event.set(logstash.start, Time.now());"
}
The primary factors contributing to ingestion lag include the consumption performance of the Kafka cluster, the Logstash input performance, data skew across the different topic partitions, and most importantly, the backpressure propagation to the Logstash input plugin, because Logstash does not fetch new events from the Kafka topic as quickly as they become available, when it is busy processing the events that it has already fetched.
Network latency and reduced size of TCP read buffer (SO_RCVBUF) on the Logstash host can also throttle Logstash from fetching the data from Kafka at the required rate.
Consumer lag serves as an effective indicator of this issue and can be viewed on Kafka's consumer group metrics. It is calculated as the difference between the log-end offset (the offset of the most recently produced message) and the current offset (the last committed offset by the consumer) for each partition.
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server <server:port> --describe --group <group_id>
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
logstash-cg-soc-1 windows-events 0 4498 17309 12811
logstash-cg-soc-1 windows-events 1 4470 17213 12743
...
C) Ingest lag in the Logstash processing: This lag is the time difference between the first and last Logstash filters. To calculate this lag, an additional filter can be added at the end of the pipeline to record the `logstash.end` timestamp in the same way the `logstash.start` field was added before. The primary factors contributing to this lag are the filters efficiency of processing, which is primarily affected by the complexity and optimization of the transformations they perform, access to external services for data loading which might require network, limited number of the pipeline’s workers and small batch size, and the amount of resources available for Logstash – particularly when running on virtual environments with resources contention.
D) Ingest lag between Logstash and Elasticsearch: This lag is the time difference between the last applied Logstash filter in the pipeline, and the timestamp when the event is ingested in Elasticsearch. The ECS field `event.ingested` is automatically added by the Elastic integrations to record this value. For custom sources, the field should be added via an ingest pipeline:
{
"processors": [
{
"set": {
"field": "event.ingested",
"value": "{{_ingest.timestamp}}"
}
}
…
If the data is undergoing heavy processing in Elasticsearch before indexing, it also pays to analyze the performance of each ingest processor in the pipeline to pinpoint and optimize the heaviest ones. Ingest pipelines monitoring dashboard can help streamline this process.
The primary factors contributing to this phase’s lag are usually the Logstash output configuration like a small number of pipeline workers and batch size, slow indexing actions (like upserts), network latency, and how fast the Elasticsearch cluster can run the ingest pipelines and index the data. You can find more techniques about this last point here.
Visualizing these stages in Kibana helps identify the most throttled areas and analyze the impact of various parameter adjustments across the entire data pipeline during the tuning process.
Isolate and fix the bottleneck
Identifying the source of the bottleneck can be challenging without a systematic approach to isolating the behavior of each component and stage of the pipeline. To make the investigation approach more consistent, it is important to keep the source data consistent as well. One approach can be to use a dedicated topic with a replicated production workload, and repeat the test using different consumer groups.
Below is a set of benchmarks that can be driven while monitoring the event lag and the pipeline throughput. The best achieved results from each of the tuning exercises can be used as a basis for the next one.
First benchmark: Kafka input, no filters, null output
This benchmark is aimed at assessing the throughput of the Kafka input in isolation, excluding the downstream impacts of the Logstash filters and outputs. Use the sink plugin in the output section to discard the events without incurring IO overhead and get a theoretical maximum reading speed.
This test is better performed with and without a persistence queue to isolate the additional overhead at this stage.
It is helpful to use a unique consumer group_id for this test instead of the default `logstash`. Otherwise, this null-output pipeline might consume and drop events that should be processed by other pipelines.
input {
kafka {
...
}
}
filter {
}
output {
sink { }
}
If the throughput from this test closely matches the original pipeline, then most probably you have a closed valve upstream and consuming the events is definitely a bottleneck.
Note that the maximum throughput is significantly impacted by the Kafka cluster's ability to handle consumer requests and network latency. The maximum throughput is also bound by the rate of events that is flowing into the Kafka topic once the consumer group has caught up with the topic.
A few things might be considered in this exercise:
-
Match consumers count to partitions count: Ideally, the total number of consumer threads across all the pipelines that share the same consumer group_id, should be equal to the number of topic partitions for a perfect balance. Each Kafka topic-partition can be assigned to at-most one consumer within a consumer group at a time. So if you have more consumer threads than your topic partitions, some of those threads will not be assigned a partition. Partition-replicas do not count, as consumer threads consume messages from the leader partitions, not directly from replicas. Exceeding 1:1 ratio may also introduce unnecessary computational overhead in Logstash without any gains in read throughput. Incrementally increasing the partition count in the topic can potentially improve the throughput. Kafka 4.0 introduces early access to KIP932, which bypasses this 1:1 mapping requirement using share groups implementing a queuing semantic to the consumption model. The Share Groups are not supported in Logstash yet.
-
Tune the input parameters for maximum throughput: Increasing
max.poll.records,fetch.max.bytes, andreceive.buffer.bytescan enhance performance. The TCP read buffer size is rarely an issue but can also be significantly important. This setting is bound by thenet.core.rmem_maxvalue. -
Use fast disks with enough space if using persistent queues: The queue sits between the input and filter stages in the same process. The I/O performance of the storage directly impacts the input throughput. When the queue is full, Logstash puts back pressure on the inputs to stall the data flow.
Second benchmark: Kafka input, filters, no outputs
This benchmark helps measure the impact of the filters on the input throughput using the best achieved input configuration from the first exercise. It quantifies the throttling effect on the input stream only caused by the events processing. Note that some filter plugins are also IO-bound, like the plugins that use the network to enrich the events.
input {
kafka {
...
}
}
filter {
...
}
output {
}
To increase the number of simultaneously processed events by the filters, try increasing the number of pipeline workers and the pipeline batch size, particularly if the pipeline worker\_utilization flow metric is near 100 and Logstash is not spending all available CPU. Increasing the workers number past the number of available processors can also yield better results as some of the filter plugins may spend significant time in an I/O wait state like external lookups.
Increasing the number of workers past the number of available processors can also improve performance, as some filter plugins may spend considerable time in an I/O wait state, such as during external lookups. This also makes a more efficient use of the Logstash host resources.
Optimizing the pipeline filters is the most effective approach to resolving this bottleneck. It can significantly reduce latency and increase the throughput regardless of the pipeline input configuration and Logstash resources. The per-plugin worker_utilization and worker_millis_per_event flow metrics are very useful in identifying where most of the resources are being spent, and consequently, where these improvements should focus first.
Optimizing pipeline filters is the most effective way to address this bottleneck. it can significantly reduce latency and boost throughput, regardless of the pipeline's input configuration or available resources. The per-plugin worker_utilization and worker_millis_per_event flow metrics are useful for finding which plugins are spending the most resources, and the optimization efforts should focus on those plugins first. Some general best practices that can usually make improvements are utilizing anchors for Grok plugins, switching to faster plugins like dissect whenever possible, optimizing Ruby filters code, eliminating unnecessary parsing, and improving the network-based enrichments.
Source: do you grok
In some cases, optimizing the pipeline may require a complete redesign of the ingestion workflow or the pipeline itself!
Third benchmark: Kafka input, no filters, Elasticsearch output
This benchmark helps quantify the throttling effect of the Elasticsearch output on the input throughput. The test can be divided into two phases: the first phase uses raw logs to isolate the impact of Elasticsearch indexing, while the second phase assesses the impact of ingest pipelines.
In case a pipeline is using multiple outputs, note that, by default, a pipeline is blocked if any single output is blocked. This behavior is important in guaranteeing at-least-once delivery of data, but can cause the outputs to perform at the rate of the most clogged one.
input {
kafka {
...
}
}
filter {
}
output {
Elasticsearch {
...
}
}
To increase throughput, consider progressively increasing the number of pipeline workers and the pipeline batch size. Prior guidance about the worker\_utilization flow metric applies here too although availability of CPU plays a smaller role since this output is mostly IO-bound. Also keep looking for the Elasticsearch Output's rejection rates (e.g.: response code 429 `es_rejected_execution_exception` indicating explicit back-pressure) as a signal that the Elasticsearch cluster is busy processing other batches.
The Logstash output tries to send batches of events to the Elasticsearch Bulk API in a single request. However, if a batch exceeds 20 MB, the plugin splits it into multiple bulk requests.
If the Elasticsearch cluster is behind a proxy or API gateway, it's important to adjust the proxy limits to allow Logstash requests with large payloads to pass through to the Elasticsearch cluster. By default, most proxy servers have a much smaller maximum size for HTTP request payloads, which should be tuned in this case to accommodate larger requests. To identify potential issues, look for error code 413 in your proxy logs, as this indicates that the size of the Logstash request has exceeded the maximum payload size the proxy is configured to handle.
On the Elasticsearch cluster, tune your ingest pipelines efficiency following the same general best practices discussed above for the Logstash pipelines. Also, tune for the indexing speed by using faster hardware, less index refreshes, auto-generated IDs, and consider increasing the number of primary shards to enhance indexing parallelism if you have multiple nodes. Beware that excessively increasing this number can negatively impact the search performance.
Finally, keep in mind that the Elasticsearch output plugin is mostly IO-bound, which means that your network latency and bandwidth significantly reduce the rate at which data is transferred and hence your output throughput.
Reassemble your pipeline
After tuning the pipeline in each of the previous phases separately, put all the parts together again to assess the real throughput and latency of the reassembled pipeline. At this last step, you should have reached the best performance from your Logstash host as well, and you can progressively add more instances to reach the ultimate latency and throughput you are aiming for for a specific topic or data source.
Example
Below is an example of the configuration required on Logstash and Elasticsearch to implement the architecture above.
Logstash pipeline:
input {
kafka {
bootstrap_servers => "<server>:<port>"
topics => ["<topic-id>"]
group_id => "<consumer-group-id>"
decorate_events => "extended"
auto_offset_reset => "earliest"
codec => json {
}
}
}
filter {
ruby {
code => "event.set('[logstash][start]', Time.now());"
}
mutate {
add_field => {
"[kafka][timestamp]" => "%{[@metadata][kafka][timestamp]}"
"[kafka][offset]" => "%{[@metadata][kafka][offset]}"
"[kafka][consumer_group]" => "%{[@metadata][kafka][consumer_group]}"
"[kafka][topic]" => "%{[@metadata][kafka][topic]}"
}
}
date {
match => ["[kafka][timestamp]", "UNIX", "UNIX_MS"]
target => "[kafka][timestamp]"
}
...
ruby {
code => "event.set('[logstash][end]', Time.now());"
}
}
output {
elasticsearch {
hosts => "hosts"
api_key => "api_key"
data_stream => true
ssl => true
}
}
Create an ingest pipeline for lag calculation. Note that when using Elastic integrations, the ECS fields: \ \*.end\, \ \*.start\, \ \*.timestamp\ are automatically mapped as a date.
PUT _ingest/pipeline/calculate_ingest_lag
{
"processors": [
{
"set": {
"field": "event.ingested",
"value": "{{_ingest.timestamp}}",
"ignore_failure": true
}
},
{
"script": {
"lang": "painless",
"if": "ctx['@timestamp'] != null && ctx?.kafka?.timestamp != null && ctx?.logstash?.start != null && ctx?.logstash?.end != null && ctx?.event?.ingested != null",
"source": """
ctx.lag_in_millis = [:];
ctx.lag_in_millis.src_kfk = Duration.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['kafka']['timestamp'])).toMillis();
ctx.lag_in_millis.kfk_ls = Duration.between(ZonedDateTime.parse(ctx['kafka']['timestamp']), ZonedDateTime.parse(ctx['logstash']['start'])).toMillis();
ctx.lag_in_millis.within_ls = Duration.between(ZonedDateTime.parse(ctx['logstash']['start']), ZonedDateTime.parse(ctx['logstash']['end'])).toMillis();
ctx.lag_in_millis.ls_es = Duration.between(ZonedDateTime.parse(ctx['logstash']['end']), ZonedDateTime.parse(ctx['event']['ingested'])).toMillis();
ctx.lag_in_millis.end_end = Duration.between(ZonedDateTime.parse(ctx['@timestamp']), ZonedDateTime.parse(ctx['event']['ingested'])).toMillis();
"""
}
}
]
}
Use the pipeline to add the lag calculation to your Elastic integrations
PUT _ingest/pipeline/logs-system.integration@custom
{
"processors": [
{
"pipeline": {
"name": "calculate_ingest_lag",
"ignore_missing_pipeline": true,
"description": "add ingest lag calculation to elastic_agent integration"
}
}
]
}
Kibana Dashboard and Alerts
Using the metrics mentioned above along with the Log Rate ML job, you can set up Kibana alerts to trigger when with anomalous changes in throughput or delays or simply when delays exceed defined thresholds.
Time to try it out
Start your free 14-day trial of Elastic Cloud to experience the latest version of Elastic. Also, make sure to take advantage of the Elastic threat detection training to set yourself up for success.