Engineering

Doppelte Dokumente in Elasticsearch finden und entfernen

Viele der Systeme, die Elasticsearch mit Daten versorgen, nutzen die automatisch generierten ID-Werte von Elasticsearch für neu eingefügte Dokumente. Wenn die Datenquelle jedoch versehentlich dasselbe Dokument mehrmals an Elasticsearch sendet und jedes in Elasticsearch eingefügte Dokument eine solche automatisch generierte _id erhält, dann wird dasselbe Dokument in Elasticsearch mehrfach mit unterschiedlichen _id-Werten gespeichert. In diesem Fall kann es notwendig sein, solche Duplikate zu finden und zu entfernen. Daher befassen wir uns in diesem Blogeintrag damit, wie Sie doppelte Dokumente aus Elasticsearch (1) mit Logstash oder (2) mit benutzerdefiniertem Python-Code erkennen und eliminieren können.

Beispielstruktur für Dokumente

Für diesen Blogeintrag gehen wir davon aus, dass die Dokumente im Elasticsearch-Cluster die folgende Struktur haben. Diese Struktur entspricht einem Datensatz mit Dokumenten, die verschiedene Aktienkurse abbilden.

   {
      "_index": "stocks",
      "_type": "doc",
      "_id": "6fo3tmMB_ieLOlkwYclP",
      "_version": 1,
      "found": true,
      "_source": {
        "CAC": 1854.6,
        "host": "Alexanders-MBP",
        "SMI": 2061.7,
        "@timestamp": "2017-01-09T02:30:00.000Z",
        "FTSE": 2827.5,
        "DAX": 1527.06,
        "time": "1483929000",
        "message": "1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version": "1"
      }
    }

Angesichts dieser Dokumentstruktur legen wir für diesen Blogeintrag fest, dass Dokumente Duplikate sind, wenn sie jeweils dieselben Werte in den Feldern ["CAC", "FTSE", "SMI"] haben.

Elasticsearch-Dokumente mit Logstash deduplizieren

Sie können Logstash verwenden, um doppelte Dokument in einem Elasticsearch-Index zu finden und zu entfernen. Diese Technik wird in unserem Blogeintrag zum Umgang mit Duplikaten in Logstash beschrieben, und in diesem Abschnitt zeige ich Ihnen ein konkretes Beispiel.

Im folgenden Beispiel habe ich eine einfache Logstash-Konfiguration geschrieben, die Dokumente aus einem Index in einem Elasticsearch-Cluster liest, mit dem Blueprint-Filter einen eindeutigen _id-Wert für die einzelnen Dokumente auf Basis eines Hashs über die Felder ["CAC", "FTSE", "SMI"] berechnet und die Dokumente anschließend zurück in einen neuen Index im gleichen Elasticsearch-Cluster schreibt. Dabei werden doppelte Dokumente in dieselbe _id geschrieben und somit eliminiert.

Mit wenigen Änderungen können wir denselben Logstash-Filter auch auf zukünftige Dokumente anwenden, die in den neu erstellten Index geschrieben werden, um Duplikate praktisch in Echtzeit zu entfernen. Dazu können wir den Eingabebereich im folgenden Beispiel so ändern, dass Dokumente aus Ihrer Echtzeit-Eingabequelle akzeptiert werden, anstatt Dokumente aus einem vorhandenen Index abzurufen.

Beachten Sie dabei, dass benutzerdefinierte _id-Werte (_id-Werte, die nicht von Elasticsearch generiert werden) die Schreibleistung Ihrer Indexoperationen beeinträchtigen können.

Je nach verwendetem Hashalgorithmus können bei diesem Ansatz theoretisch auch Hash-Kollisionen für den _id-Wert vorkommen, was dazu führen kann, dass zwei nicht identische Dokument zur gleichen _id zugeordnet werden. In diesem Fall würde eines dieser Dokumente verloren gehen. In der Praxis ist die Wahrscheinlichkeit von Hash-Kollisionen jedoch sehr gering. Eine ausführliche Analyse der verschiedenen Hashfunktionen übersteigt den Rahmen dieses Blogeintrags, aber die im Fingerprint-Filter verwendete Hashfunktion sollte sorgfältig ausgewählt werden, da sie sich auf die Ingestionsleistung und die Anzahl der Hash-Kollisionen auswirkt.

Hier sehen Sie eine einfache Logstash-Konfiguration zum Deduplizieren eines vorhandenen Index mit dem Fingerprint-Filter.

input {
  # Alle Dokumente aus Elasticsearch lesen 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}
# Dieser Filter wurde am 18. Februar 2019 aktualisiert
filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
        concatenate_sources => true # <-- Neue Zeile seit dem ursprünglichen Beitragsdatum hinzugefügt
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

Benutzerdefiniertes Python-Skript zum Deduplizieren von Elasticsearch-Dokumenten

Ein speicherschonender Ansatz

Falls Sie nicht mit Logstash arbeiten, können Sie Ihre Dokumente mit einem benutzerdefinierten Python-Skript deduplizieren. Für diesen Ansatz berechnen wir den Hash über die Felder ["CAC", "FTSE", "SMI"], die laut unserer Definition ein Dokument eindeutig identifizieren. Anschließend verwenden wir diesen Hash als Schlüssel in einem Python-Wörterbuch, in dem der zugeordnete Wert für jeden Wörterbucheintrag ein Array der _ids der Dokumente enthält, die denselben Hash haben.

Wenn mehr als ein Dokument denselben Hash hat, können die doppelten Dokumente, die denselben Hash haben, gelöscht werden. Falls Sie mögliche Hash-Kollisionen ausschließen möchten, können Sie den Inhalt der Dokumente mit gleichem Hash analysieren, um herauszufinden, ob die Dokumente wirklich identisch sind, und die Duplikate anschließend eliminieren.

Analyse des Erkennungsalgorithmus

Wenn ein Index mit 50 GB Dokumente mit einer durchschnittlichen Größe von 0,4 KB enthält, dann befinden sich etwa 125 Millionen Dokumente im Index. Wenn wir einen MD5-Hash mit 128 Bit verwenden, dann beträgt der Speicherverbrauch für die Deduplizierungsdatenstrukturen im Arbeitsspeicher also 128 Bits x 125 Millionen = 2 GB Arbeitsspeicher, plus 160 Bits x 125 Millionen = 2,5 GB Arbeitsspeicher für die _ids mit jeweils 160 Bits. Dieser Algorithmus benötigt also etwa 4,5 GB Arbeitsspeicher für alle relevanten Datenstrukturen. Dieser Speicherbedarf kann dramatisch reduziert werden, wenn der im nächsten Abschnitt beschriebene Ansatz eingesetzt werden kann.

Erweiterter Algorithmus

In diesem Abschnitt stelle ich Ihnen eine Erweiterung des Algorithmus vor, mit dem Sie den Speicherbedarf reduzieren und neue Duplikate fortlaufend entfernen können.

Wenn Sie Zeitreihendaten speichern und wissen, dass doppelte Dokumente nur in kurzen Abständen zueinander auftreten werden, können Sie den Speicherbedarf des Algorithmus verbessern, indem Sie den Algorithmus immer wieder über eine Teilmenge der Dokumente im Index ausführen, wobei jede Teilmenge einem anderen Zeitfenster entspricht. Wenn Sie beispielsweise Daten für ein Jahr haben, können Sie Bereichsabfragen über Ihr datetime-Feld verwenden (in einem Filterkontext für optimale Leistung), um Ihre Daten Woche für Woche zu durchlaufen. In diesem Fall führen Sie den Algorithmus 52-mal aus (einmal pro Woche), und der Speicherbedarf für den schlechtesten Fall würde um den Faktor 52 reduziert.

Im obigen Beispiel haben Sie sich vielleicht gefragt, was mit Duplikaten passiert, die mehrere Wochen umspannen. Angenommen, Sie wissen, dass doppelte Dokumente nicht mehr als zwei Stunden voneinander getrennt auftreten können. In diesem Fall stellen Sie sicher, dass jede Ausführung des Algorithmus alle Dokumente enthält, die sich um jeweils zwei Stunden mit den Dokumenten aus dem letzten Durchlauf des Algorithmus überschneiden. Für das wöchentliche Beispiel müssten Sie also 170 Stunden (1 Woche + 2 Stunden) an Zeitreihendokumenten abfragen, um sicherzugehen, dass keine Duplikate übersehen werden.

Wenn Sie doppelte Dokumente in regelmäßigen Abständen aus Ihren Indizes entfernen möchten, können Sie diesen Algorithmus für die zuletzt empfangenen Dokumente ausführen. Dabei gilt dieselbe Logik wie oben: Die zuletzt empfangenen Dokumente müssen zusammen mit einem Teil der etwas älteren Dokumente analysiert werden, um sicherzustellen, dass keine Duplikate übersehen werden.

Python-Code zum Erkennen von Duplikaten

Der folgende Code demonstriert, wie Sie Dokumente effizient auf Gleichheit prüfen und bei Bedarf eliminieren können. In diesem Beispiel führen wir jedoch keinen tatsächlichen Löschvorgang aus, um zu verhindern, dass Dokumente versehentlich gelöscht werden. Diese Funktion kann jedoch mühelos hinzugefügt werden.

Sie finden den Code zum Deduplizieren von Dokumenten in Elasticsearch auch auf GitHub.

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# Die folgende Zeile definiert die Felder,
# mit denen ermittelt wird, ob ein Dokument ein Duplikat ist
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
# Von der aktuellen Suche zurückgegebene Dokumente verarbeiten/scrollen
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        # Wenn hashval neu ist, generieren wir einen neuen Schlüssel
        # in dict_of_duplicate_docs und weisen ihm
        # ein leeres Array als Wert zu.
        # Anschließend schreiben wir sofort die _id in das Array.
        # Wenn hashval bereits existiert,
        # schreiben wir einfach die neue _id in das vorhandene Array
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Alle Dokumente im Index durchlaufen und die
# Datenstruktur dict_of_duplicate_docs füllen.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})
    # Scroll-ID abrufen
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    # Aktuellen Treffer-Batch verarbeiten und anschließend scrollen
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        # Aktuellen Treffer-Batch verarbeiten
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        # Scroll-ID aktualisieren
        sid = data['_scroll_id']
        # Anzahl der im letzten Scroll zurückgegebenen Ergebnisse abrufen
        scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
    # Hashes der Dokumentwerte nach
    # doppelt vorhandenen Hashes durchsuchen
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Dokumente abrufen, die zum aktuellen hashval zugeordnet wurden
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # In diesem Beispiel geben wir die doppelten Dokumente nur aus.
            # Dieser Code kann problemlos geändert werden,
            # um die Duplikate zu löschen anstatt sie auszugeben.
            print("doc=%s\n" % doc)
def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()
main()

Fazit

In diesem Blogeintrag habe ich Ihnen zwei Methoden zum Deduplizieren von Dokumenten in Elasticsearch vorgestellt. Die erste Methode verwendet Logstash zum Entfernen von doppelten Dokumenten, und die zweite Methode verwendet ein benutzerdefiniertes Python-Skript, um doppelte Dokumente zu finden und zu entfernen.

Falls Sie Fragen zum Deduplizieren von Elasticsearch-Dokumenten oder zu anderen Themen rund um Elasticsearch haben, finden Sie hilfreiche Einblicke und Informationen in unseren Diskussionsforen.