Log deduplication with Elasticsearch

Duplicate events from unhealthy application services make log searching tricky. Check out how to handle duplicates using Logstash, Beats and Elastic Agent.

elasticsearch.png

Log deduplication with Elasticsearch

SREs get flooded by large volumes of logs from noisy applications every day. In his seminal work The Mythical Man Month, Frederick P. Brooks said that "all programmers are optimists." This optimism manifests in software engineers not putting in controls to stop their applications from sending continuous logs in exceptional failure situations. In large organizations with centralized logging platforms, this flood of events is being ingested into logging platforms and taking up considerable storage volumes and processing compute. On the people side, it leaves SREs feeling overwhelmed and suffering from alert fatigue as they are engulfed by the wave of messages, a bit like this:

 

![Surfer Engulfed by Waves Gif](./images/1.gif)

 

Developers building software including microservices and key applications are responsible for ensuring they do not send duplicate log events, and that they are sending the correct log events at the right level. Nevertheless, situations such as the use of third-party solutions or maintenance of ageing services mean we cannot always guarantee responsible logging practices have been applied. Even if we drop unnecessary fields as covered in this piece on pruning fields from incoming log events, we still have an issue with storing large numbers of duplicate events. Here we discuss the challenges of identifying duplicate logs from problematic services and how to deduplicate data using Elastic Beats, Logstash, and Elastic Agent.

What is a duplicate log entry?

Before diving into the various ways of preventing these duplicates from making it into your logging platform, we need to understand what a duplicate is. In my prior life as a software engineer, I was responsible for developing and maintaining an ecosystem of vast microservices. Some had considered retry logic that, after some time, would shut the service down gracefully and trigger appropriate alerts. However, not all services are built to gracefully handle these cases. Service misconfiguration can also contribute to event duplication. Inadvertently changing the production log level from WARN to TRACE can lead to more aggressive event volumes that have to be handled by the logging platform.

Elasticsearch automatically generates a unique ID for each document ingested unless a document contains an _id field on ingestion. Therefore, if your service is sending repeated alerts, you run the risk of having the same event stored as multiple documents with different IDs. Another cause can be due to retry mechanisms for the tools used for log collection. A notable example is for Filebeat, where a lost connection or shutdown can cause the retry mechanism of Filebeat to resend an event until the output acknowledges receipt of the event.

Tools Overview

In this blog we shall examine the tools available in four Elastic tools:

  1. Logstash is a free and open ETL pipeline tool that allows you to ingest, transform, and output data between a myriad of sources, including ingestion into and output from Elasticsearch. These examples will be used to show the different behaviour of Elasticsearch when generating logs with and without a particular ID.
  2. Beats are a family of lightweight shippers that allow us to ingest events from a given source into not just Elasticsearch, but also other outputs, including Kafka, Redis, or Logstash.
  3. Ingest pipelines allow for transformations and enrichment to be applied to documents ingested into Elasticsearch. It's like running the filter part of Logstash directly into Elasticsearch without the need to have another service running. New pipelines can be created either within the Stack Management > Ingest Pipelines screen or via the _ingest API, as covered in the documentation.
  4. Elastic Agent is a single agent that can execute on your host and send logs, metrics, and security data from multiple services and infrastructure to Elasticsearch using the various supported integrations. Regardless of the reason for the duplicates, there are several possible courses of action in the Elastic ecosystem.

Ingestion without specified IDs

The default approach is to ignore and ingest all events. When an ID is not specified on a document, Elasticsearch will auto-generate a new ID for each document it receives. Let's take a simple example, available in this GitHub repository, using a simple Express HTTP server. The server, when run, exposes a single endpoint returning a single log message:

{"event":{"transaction_id":1,"data_set":"my-logging-app"},"message":"WARN: Unable to get an interesting response"}

Using Logstash we can poll the endpoint http://locahost:3000/ every 60 seconds and send the result to Elasticsearch. Our logstash.conf looks like the below:

input {
  http_poller {
    urls => {
      simple_server => "http://localhost:3000"
    }
    request_timeout => 60
    schedule => { cron => "* * * * * UTC"}
    codec => "json"
  }
}
output {
  elasticsearch { 
    cloud_id => "${ELASTIC_CLOUD_ID}" 
    cloud_auth => "${ELASTIC_CLOUD_AUTH}"
    index => "my-logstash-index"
    }
}

Logstash will push each event, and without any ID on the event Elasticsearch will generate a new _id field to serve as a unique identifier for each document:

GET my-logstash-index/_search
{
  "took": 0,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 11,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "my-logstash-index",
        "_id": "-j83XYsBOwNNS8Sc0Bja",
        "_score": 1,
        "_source": {
          "@version": "1",
          "event": {
            "transaction_id": 1,
            "original": """{"event":{"transaction_id":1,"data_set":"my-logging-app"},"message":"WARN: Unable to get an interesting response"}""",
            "data_set": "my-logging-app"
          },
          "message": "WARN: Unable to get an interesting response",
          "@timestamp": "2023-10-23T15:47:00.528205Z"
        }
      },
      {
        "_index": "my-logstash-index",
        "_id": "NT84XYsBOwNNS8ScuRlO",
        "_score": 1,
        "_source": {
          "@version": "1",
          "event": {
            "transaction_id": 1,
            "original": """{"event":{"transaction_id":1,"data_set":"my-logging-app"},"message":"WARN: Unable to get an interesting response"}""",
            "data_set": "my-logging-app"
          },
          "message": "WARN: Unable to get an interesting response",
          "@timestamp": "2023-10-23T15:48:00.314262Z"
        }
      },
      // Other documents omitted
    ]
  }
}

This behavior is consistent for Beats, Ingest Pipelines and Elastic Agent as they will send all events received without additional configuration.

BYO-ID

Specifying a unique ID for each event using an existing ID bypasses the Elasticsearch ID generation step discussed in the previous section. Ingesting a document where this attribute already exists will result in Elasticsearch checking if a document with this ID exists in the index, and updating the document if it does. This does result in an overhead as the index needs to be searched to check if a document with the same _id exists. Extending our above Logstash example, specifying the value of the document ID in Logstash is achievable by specifying the document_id option in the Elasticsearch output plugin, which would be used to ingest events into Elasticsearch:

# http_poller configuration omitted
output {
  elasticsearch { 
    cloud_id => "${ELASTIC_CLOUD_ID}" 
    cloud_auth => "${ELASTIC_CLOUD_AUTH}"
    index => "my-unique-logstash-index"
    document_id => "%{[event][transaction_id]}"
    }
}

This will set the value of the _id field to the value of event.transaction_id. In our case, this means the new document will replace the existing document on ingestion as both documents have an _id of 1:

GET my-unique-logstash-index/_search
{
  "took": 48,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "my-unique-logstash-index",
        "_id": "1",
        "_score": 1,
        "_source": {
          "@timestamp": "2023-10-23T16:33:00.358585Z",
          "message": "WARN: Unable to get an interesting response",
          "@version": "1",
          "event": {
            "original": """{"event":{"transaction_id":1,"data_set":"my-logging-app"},"message":"WARN: Unable to get an interesting response"}""",
            "data_set": "my-logging-app",
            "transaction_id": 1
          }
        }
      }
    ]
  }
}

The ID is specified in various ways depending on the tool, as discussed further in subsequent sections.

Beats

For JSON documents, which is a common format for many sources of logs, if your event does have a useful and meaningful ID that can be used as the unique ID for a document and prevent duplicate entries, using either the decode_json_fields processor or the json.document_ID input setting as recommended in the documentation. This approach is preferred over generating a key when a natural key is present within a JSON field within our message. Both settings are shown in the below example:

filebeat.inputs:
- type: filestream
  id: my-logging-app
  paths:
    - /var/tmp/other.log
    - /var/log/*.log
  json.document_id: "event.transaction_id"  
# Alternative approach using decode_json_fields processor
processors:
  - decode_json_fields:
      document_id: "event.transaction_id"
      fields: ["message"]
      max_depth: 1
      target: ""

Ingest pipelines

In this case, the ID can be set using a set processor combined with the copy_from option to transfer the value from your unique field to the Elasticsearch @metadata._id attribute:

PUT _ingest/pipeline/test-pipeline
{
  "processors": [
    {
      "set": {
        "field": "_id",
        "copy_from": "transaction_id"
      }
    }
  ]
}

Elastic Agent

Elastic Agent has a similar approach where you can use the copy_fields processor to copy the value to the @metadata._id attribute in the integration:

- copy_fields:
      fields:
        - from: transaction_id
          to: @metadata._id
      fail_on_error: true
      ignore_missing: true

The fail_on_error setting when true will result in a return to the prior state by reverting the changes applied by the failing processor. Meanwhile ignore_missing will only trigger a failure for a document with a non-existent field when set to false.

Auto-generated ID

Generating a unique ID using techniques such as fingerprinting on a subset of event fields. By hashing a set of fields a unique value is generated that, when matched, will result in an update of the original document on ingest in Elasticsearch. As this handy piece on handing duplicates with Logstash specifically outlines, the fingerprint filter plugin can be configured to generate an ID with the specified hashing algorithm to field @metadata.fingerprint:

filter {
  fingerprint {
    source => ["event.start_date", "event.data_set", "message"]
    target => "[@metadata][fingerprint]"
    method => "SHA256"
  }
}
output {
  elasticsearch {
    hosts => "my-elastic-cluster.com"
    document_id => "%{[@metadata][fingerprint]}"
  }
}

If not specified, the default hashing algorithm SHA256 will be used to hash the combination |event.start_date|start_date_value|event.data_set|data_set_value|message|message_value|. If we wanted to use one of the other permitted algorithm options, it can be specified using the method option. This will result in Elasticsearch updating the document matching the generated _id:

GET my-fingerprinted-logstash-index/_search
{
  "took": 8,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 1,
    "hits": [
      {
        "_index": "my-fingerprinted-logstash-index",
        "_id": "b2faceea91b83a610bf64ac2b12e3d3b95527dc229118d8f819cdfaa4ba98af1",
        "_score": 1,
        "_source": {
          "@timestamp": "2023-10-23T16:46:00.772480Z",
          "message": "WARN: Unable to get an interesting response",
          "@version": "1",
          "event": {
            "original": """{"event":{"transaction_id":1,"data_set":"my-logging-app"},"message":"WARN: Unable to get an interesting response"}""",
            "data_set": "my-logging-app",
            "transaction_id": 1
          }
        }
      }
    ]
  }
}

If your event does not have a single meaningful identifying field, this may be a useful option if you are happy to take the processing overhead of generating the ID, or the potential for collisions where different events resolve to the same generated hash. Similar capabilities are available for other tools, as discussed in subsequent sections.

Beats

The add_id processor for Beats and Elastic Agent will allow for a unique Elasticsearch-compatible ID to be generated. By default, this value will be stored in the @metadata._id field which is the ID field for Elasticsearch documents.

filebeat.inputs:
- type: filestream
  ID: my-logging-app
  paths:
    - /var/tmp/other.log
    - /var/log/*.log
  json.document_ID: "event.transaction_id"  
processors:
  - add_ID: ~
      target_field: @metadata._id

Alternatively, the fingerprint processor generates a hashed value of a concatenation of the specified field name and value pairs separated by the | operator.

filebeat.inputs:
- type: filestream
  ID: my-logging-app
  paths:
    - /var/tmp/other.log
    - /var/log/*.log
processors:
  - fingerprint:
      fields: ["event.start_date", "event.data_set", "message"]
      target_field: "@metadata._id"
      method: "sha256"
      ignore_missing: false

In the above example, the default hashing algorithm sha256 will be used to hash the combination |event.start_date|start_date_value|event.data_set|data_set_value|message|message_value|. If we wanted to use one of the other permitted algorithm options, it can be specified using the method option. Error handling is also an important consideration that the ignore_missing option assists with. For example, if the event.start_date field does not exist on a given document, an error will be raised when ignore_missing is set to false. This is the default implementation if ignore_missing is not explicitly set, but it's common to ignore errors by specifying the value as true.

Elastic Agent

Just like Beats, Elastic Agent has an add_id processor that can be used to generate a unique ID, defaulting to @metadata._id if the target_field attribute is not specified:

  - add_id:
      target_field: "@metadata._id"

Alternatively, the fingerprint processor is also available in Elastic Agent and can be applied to any integration segment that includes an advanced configuration section including a processors option. The processor logic looks like the below:

  - fingerprint:
      fields: ["event.start_date", "event.data_set", "message"]
      target_field: "@metadata._id"
      ignore_missing: false
      method: "sha256"

Taking the Kafka integration as an example, the above processor snippet can be applied in the processor segment of the advanced configuration section for the Collect logs from Kafka brokers:

 

![Elastic Agent Kafka Integration Fingerprint Processor](./images/2.png)

 

Just like Beats, the value that is hashed is constructed as a concatenation of the field name and field value separated by |. For example |field1|value1|field2|value2|. However, just like in Beats and unlike in Logstash, the method value is in lowercase despite supporting the same encoding algorithms.

Ingest pipelines

Here we'll show the sample request to create a pipeline with a fingerprint processor with the _ingest API. Note the similarities of the below configuration to our Beats processors:

PUT _ingest/pipeline/my-logging-app-pipeline
{
  "description": "Event and field dropping for my-logging-app",
  "processors": [
    {
      "fingerprint": {
        fields: ["event.start_date", "event.data_set", "message"]
        target_field: "@metadata._id"
        ignore_missing: false
        method: "SHA-256"
      }
    }
  ]
}

Aggregate Events

Aggregating events together based on common fields is another option, if the tool using supports it. Aggregation of events comes with a tradeoff as the tool needs to keep several events in memory to perform the aggregation rather than immediately forwarding the event over to the output. For this reason, the only tool within the Elastic ecosystem that supports event aggregation is Logstash.

To implement the aggregation-based approach in Logstash, use the aggregate plugin. In our case, it's unlikely that a specific end event will be sent to distinguish between duplicates, meaning specifying a timeout as per the below example is needed to control the batching process:

filter {
  grok {
    match => [ "message", %{NOTSPACE:event.start_date} "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_ID} - %{GREEDYDATA:message}" ]
  }
  aggregate {
    task_ID => "%{event.start_date}%{loglevel}%{user_ID}"
    code => "map['error_count'] ||= 0; map['error_count'] += 1;"
    push_map_as_event_on_timeout => true
    timeout_task_ID_field => "user_id"
    timeout => 600
    timeout_tags => ['_aggregatetimeout']
    timeout_code => "event.set('has_multiple_occurrences', event.get('error_count') > 1)"
  }
}

The above example will send an event after 600 seconds, or 10 minutes, adding the error_count and has_multiple_occurrences attributes to the event to indicate an aggregated event. The push_map_as_event_on_timeout option will ensure that the aggregation result is pushed on each timeout, allowing you to reduce the alert volume. When determining the timeout for your data, consider your volume and opt for the lowest timeout you can as Logstash will hold the events in memory until the timeout expires and the aggregate event is pushed.

Conclusions

Log volume spikes can quickly overwhelm logging platforms and SRE engineers looking to maintain reliable applications. We have discussed several approaches to handling duplicate events using Elastic Beats, Logstash (which are available in this GitHub repository), and Elastic Agent.

When generating IDs via a hashing algorithm using fingerprint processors, or performing aggregates, consider the attributes used carefully to balance preventing a flood and obfuscating legitimate streams pointing to a large-scale problem in your ecosystem. Both approaches have an overhead, either in terms of processing to generate the ID, or memory overhead to store the documents eligible to aggregate.

Selecting an option really depends on the events you consider duplicates and the performance trade-offs. As discussed, when you specify an ID Elasticsearch needs to check for the existence of a document matching that ID before adding the document to the index. This results in a slight delay in ingestion to perform the _id existence check.

Using hashing algorithms to generate the ID adds additional processing time as the ID needs to be generated for each event before it is compared and potentially ingested. Choosing to not specify an ID bypasses this check as Elastic will generate the ID for you, but will result in all events being stored which increases your storage footprint.

Dropping full events is a legitimate practice not covered in this piece. If you want to drop log entries to reduce your volume check out this piece on pruning fields from incoming log events.

If your favorite way to deduplicate events is not listed here, do let us know!

Resources

  1. Elastic Beats
  2. Filebeat | Fingerprint processor
  3. Filebeat | Decode JSON fields processor
  4. Filebeat | Filebeat deduplication
  5. Logstash
  6. Logstash | Fingerprint plugin
  7. Logstash | Aggregate plugin
  8. Elastic Agent
  9. Elastic Agent | Fingerprint processor
  10. Ingest Pipelines
  11. Elasticsearch | Ingest pipeline fingerprint processor