Tech Topics

Little Logstash Lessons: Handling Duplicates

著者

Editor's Note (August 3, 2021): This post uses deprecated features. Please reference the map custom regions with reverse geocoding documentation for current instructions.

This post describes approaches for de-duplicating data in Elasticsearch using Logstash. Depending on your use case, duplicated content in Elasticsearch may not be acceptable. For example, if you are dealing with metrics, duplicated data in Elasticsearch may lead to incorrect aggregations and unnecessary alerts. Even for certain search use cases, duplicated data could lead to bad analysis and search results.

Background: Elasticsearch Indexing

Before we get to the de-duplication solutions, let's take a brief detour into Elasticsearch's indexing process. Elasticsearch provides a REST API for indexing your documents. You have a choice of providing an ID that uniquely represents your document or you can let Elasticsearch generate an ID for you. If you use the HTTP PUT verb with the index API, Elasticsearch expects you to supply an ID. If a document already exists with the same ID, Elasticsearch will replace the existing contents with the one you just provided — the last indexed document wins. If you use the POST verb, Elasticsearch will generate a new document with a new ID even if the content is already present in the corpus. For example, let's say you had just indexed a blog post a second ago, and you resend the same blog post using the POST verb, Elasticsearch creates another document with the same content, but a new ID.

While Elasticsearch provides an explicit _update API which can be used as a potential workaround, we will be focusing this post on just the indexing API.

Logstash's Elasticsearch output uses the indexing API and by default does not expect an ID to be supplied. Hence, it treats every single event as a separate document. However, there is an option where you can easily set a unique ID for every event in Logstash.

Bring your own ID

If your data source already has an ID, it is easy to set this as the document ID before indexing into Elasticsearch. For example, users of JDBC input could easily use the primary key from the source table as the Elasticsearch ID. Using the field reference syntax, it is straightforward to set the document ID in the output section:

output {
  elasticsearch {
    hosts => "example.com"
    document_id => "%{[upc_code]}"
  }
}

where upc_code is a field in your data. This field may have come from a field in your structured log format or extracted using a grok filter.

De-duplicating similar content

As we mentioned earlier, duplicated content may not be acceptable in your use case. Using a concept called fingerprinting and the Logstash fingerprint filter, you can create a new string field called fingerprint that uniquely identifies the original event. The fingerprint filter can take one or more fields (message field is the default) in your original event as the source to create a consistent hash. Once these fingerprints are created, you can use this as the document ID in the downstream Elasticsearch Output. This way, Elasticsearch will only update or overwrite existing document contents after comparing the fingerprint, but will never duplicate them. If you would like to consider more fields for de-duplication, the concatenate_sources option is the way to go.

The fingerprint filter has multiple algorithms you can choose to create this consistent hash. Please refer to the documentation as each function varies in the strength of the hash and may require additional options. In the following example, we are using the MURMUR3 method to create a hash from the message field and set it in a metadata field. Metadata fields are not sent to outputs, so they provide an efficient way to store data temporarily while processing events in the pipeline.

filter {
  fingerprint {
    source => "message"
    target => "[@metadata][fingerprint]"
    method => "MURMUR3"
  }
}
output {
  elasticsearch {
    hosts => "example.com"
    document_id => "%{[@metadata][fingerprint]}"
  }
}

If you are using any of the cryptographic hash functions algorithms (like SHA1, MD5), it is required to provide a key option. The key can be any arbitrary string which is used to compute the HMAC.

filter {
  fingerprint {
    source => "message"
    target => "[@metadata][fingerprint]"
    method => "SHA1",
    key => "Log analytics",
    base64encode => true
  }
}
output {
  elasticsearch {
    hosts => "example.com"
    document_id => "%{[@metadata][fingerprint]}"
  }
}

Other examples of the key can be departmentID, organization ID etc.

Accidental Duplicates: Generating a UUID from Logstash

The previous use case dealt with deliberate de-duplication of the content. In certain deployments, especially when Logstash is used with the persistent queues or other queuing systems that guarantee at least one delivery, there can be duplicates in Elasticsearch. When Logstash crashes while processing, data in the queue is replayed on restart — this can result in duplicates. To reduce duplication resulting from such scenarios, one can use UUIDs for every event. The important point here is that the UUIDs need to be generated on the producer side (i.e. Logstash instance that publishes to the queuing system) before data is serialized to the message queue. This way, the Logstash consumer will keep the same event id if it needs to reprocess events when restoring from a crash or restart.

The same fingerprint filter can be used to generate UUIDs if your source data does not have a unique identifier. Keep in mind, this approach does not consider the contents of the event itself but generates a version 4 UUID for every event.

filter {
  fingerprint {
    target => "%{[@metadata][uuid]}"
    method => "UUID"
  }
}
output {
  elasticsearch {
    hosts => "example.com"
    document_id => "%{[@metadata][uuid]}"
  }
}

If you are using a queue in between your Logstash producer and consumer, you have to copy the @metadata field explicitly since they don't get persisted to the outputs. Alternatively, you can use a regular field as below:

filter {
  fingerprint {
    target => "generated_id"
    method => "UUID"
  }
}
output {
  kafka {
    brokers => "example.com"
    ...
  }
}

From the consumer side, you can then just use:

input {
  kafka {
    brokers => "example.com"
  }
}
output {
  elasticsearch {
    hosts => "example.com"
    document_id => "%{[generated_id]}"
  }
}

Conclusion

As you've seen in this post, the fingerprint filter can serve multiple purposes and is a plugin you should familiarize yourself in the Logstash ecosystem. Take a stab at using this filter and let us know what you think!

Editor's Note: To learn more about handling duplicate documents with Logstash and/or Python, check out our How to Find and Remove Duplicate Documents in Elasticsearch blog.