Ingeniería

Cómo encontrar y eliminar documentos duplicados en Elasticsearch

Muchos sistemas que dirigen datos a Elasticsearch aprovecharán los valores id autogenerados de Elasticsearch para los documentos recientemente insertados. Sin embargo, si la fuente de datos envía accidentalmente el mismo documento varias veces a Elasticsearch y se usan dichos valores _id autogenerados para cada documento que Elasticsearch inserta, entonces este mismo documento se almacenará varias veces en Elasticsearch con diferentes valores _id. En este caso, es posible que resulte necesario encontrar y eliminar los duplicados. Por lo tanto, en este blog abordamos cómo detectar y eliminar documentos duplicados de Elasticsearch (1) usando Logstash o (2) usando un código personalizado de Python.

Ejemplo de estructura del documento

En este blog, suponemos que los documentos en el cluster de Elasticsearch tienen la estructura siguiente. Esto corresponde a un set de datos que contiene documentos que representan operaciones del mercado de valores.

   {
      "_index": "stocks",
      "_type": "doc",
      "_id": "6fo3tmMB_ieLOlkwYclP",
      "_version": 1,
      "found": true,
      "_source": {
        "CAC": 1854.6,
        "host": "Alexanders-MBP",
        "SMI": 2061.7,
        "@timestamp": "2017-01-09T02:30:00.000Z",
        "FTSE": 2827.5,
        "DAX": 1527.06,
        "time": "1483929000",
        "message": "1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version": "1"
      }
    }

Dado este ejemplo de estructura del documento, en este blog suponemos arbitrariamente que si varios documentos tienen los mismos valores en los campos ["CAC", "FTSE", "SMI"], son duplicados entre sí.

Uso de Logstash para la deduplicación de documentos de Elasticsearch

Logstash puede usarse para detectar y eliminar documentos duplicados de un índice de Elasticsearch. Esta técnica se describe en este blog sobre el manejo de duplicados con Logstash, y en esta sección se muestra un ejemplo concreto que aplica este enfoque.

En el ejemplo siguiente, escribimos una configuración simple de Logstash que lee documentos de un índice en un cluster de Elasticsearch, luego usa el filtro de huellas digitales para calcular un valor _id único para cada documento basado en un hash de los campos ["CAC", "FTSE", "SMI"] y, por último, vuelve a escribir cada documento en un índice nuevo en ese mismo cluster de Elasticsearch, de modo que los documentos duplicados se escribirán con el mismo _id y se eliminarán.

Además, también se podría aplicar el mismo filtro de Logstash con pequeñas modificaciones a futuros documentos escritos en el índice recientemente creado para garantizar la eliminación de los duplicados casi en tiempo real. Esto podría lograrse cambiando la sección de entrada en el ejemplo a continuación para que acepte documentos de la fuente de entrada en tiempo real, en lugar de extraer los documentos de un índice existente.

Ten en cuenta que usar valores _id personalizados (es decir, un _id que no genera Elasticsearch) afectará el rendimiento de escritura de las operaciones del índice.

También vale la pena mencionar que según el algoritmo de hash usado, este enfoque puede teóricamente resultar en una cantidad de conflictos de hashes distinta de cero para el valor _id, lo que teóricamente podría resultar en la asignación de dos documentos no idénticos al mismo _id y provocar la pérdida de uno de estos documentos. En la mayoría de los casos prácticos, la probabilidad de un conflicto de hashes posiblemente sea muy baja. El análisis detallado de las diferentes funciones de hash excede el alcance de este blog, pero la función de hash usada en el filtro de huellas digitales debe considerarse detenidamente debido a que afectará el rendimiento de ingesta y la cantidad de conflictos de hashes.

A continuación, se proporciona una configuración simple de Logstash para deduplicar un índice existente usando el filtro de huellas digitales.

input {
  # Leer todos los documentos de Elasticsearch 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}
# Filtro actualizado el 18 de febrero de 2019
filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
        concatenate_sources => true # <-- Línea nueva agregada posteriormente a la fecha de publicación original
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

Script personalizado de Python para la deduplicación de documentos de Elasticsearch

Un enfoque eficiente desde el punto de vista de la memoria

Si no se usa Logstash, la deduplicación puede lograrse de forma eficiente con un script personalizado de Python. Para este enfoque, calculamos el hash de los campos ["CAC", "FTSE", "SMI"] que definimos para identificar de manera inequívoca un documento. Luego usamos este hash como una clave en un diccionario de Python, en el cual el valor asociado de cada entrada del diccionario será una matriz de las _ids de los documentos mapeados al mismo hash.

Si más de un documento tiene el mismo hash, los documentos duplicados mapeados al mismo hash pueden eliminarse. Como alternativa, si te preocupa la posibilidad de conflictos de hashes, es posible examinar los contenidos de los documentos mapeados al mismo hash para comprobar que realmente sean idénticos y, si es así, pueden eliminarse los duplicados.

Análisis del algoritmo de detección

Si suponemos que un índice de 50 GB contiene documentos de un tamaño promedio de 0.4 kB, habría 125 millones de documentos en el índice. En este caso, la cantidad de memoria necesaria para almacenar las estructuras de datos de deduplicación en la memoria al usar un hash md5 de 128 bits sería de aproximadamente 128 bits x 125 millones = 2 GB, y los _ids de 160 bits requerirán otros 160 bits x 125 millones = 2.5 GB de memoria. Por lo tanto, este algoritmo requerirá aproximadamente 4.5 GB de RAM para mantener todas las estructuras de datos relevantes en la memoria. Es posible reducir drásticamente este consumo de memoria si puede aplicarse el enfoque que se analiza en la sección siguiente.

Mejora del algoritmo

En esta sección, presentamos una mejora del algoritmo para reducir el uso de la memoria y eliminar de forma continua nuevos documentos duplicados.

Si almacenas datos temporales y sabes que los documentos duplicados solo se producirán con poca diferencia de tiempo entre sí, entonces podrás mejorar el consumo de memoria de este algoritmo ejecutándolo reiteradamente en un subconjunto de los documentos del índice, donde cada subconjunto corresponda a una ventana de tiempo diferente. Por ejemplo, si cuentas con datos de todo un año, podrías usar búsquedas de rango en el campo datetime (dentro de un contexto de filtro para un mejor rendimiento) para avanzar por el conjunto de datos una semana a la vez. Esto requeriría ejecutar el algoritmo 52 veces (una por semana); y en este caso, este enfoque reduciría por un factor de 52 el consumo de memoria correspondiente al peor de los casos.

Es posible que en el ejemplo anterior te preocupe no detectar documentos duplicados que se extienden entre semanas. Supongamos que sabes que no pueden producirse documentos duplicados con más de 2 horas de diferencia. Entonces necesitarías asegurarte de que cada ejecución del algoritmo incluya documentos que se superponen por el lapso de 2 horas con el último conjunto de documentos analizado en la ejecución anterior del algoritmo. En el caso del ejemplo semanal, necesitarías buscar documentos temporales por 170 horas (1 semana + 2 horas) para garantizar que no se pase por alto ningún duplicado.

Si deseas eliminar periódicamente los documentos duplicados de tus índices de forma continua, puedes ejecutar este algoritmo en los documentos recibidos recientemente. Aplica la misma lógica anterior: asegúrate de incluir los documentos recibidos recientemente en el análisis y de superponer ligeramente los documentos anteriores para garantizar que no se hayan pasado por alto duplicados inadvertidamente.

Código de Python para detectar documentos duplicados

El código siguiente muestra cómo se pueden evaluar los documentos de forma eficiente para comprobar si son idénticos y posteriormente eliminarlos, si así lo deseas. Sin embargo, para prevenir eliminaciones accidentales de documentos, en este ejemplo no ejecutamos realmente ninguna operación de eliminación. Sería sencillo incluir dicha funcionalidad.

El código para deduplicar documentos de Elasticsearch también puede encontrarse en GitHub.

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# La línea siguiente define los campos que
# se usarán para determinar si un documento es un duplicado.
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
# Procesar documentos que devuelve search/scroll actual
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        # Si el hashval es nuevo, crearemos una clave nueva
        # en dict_of_duplicate_docs, a la que se
        # asignará un valor de una matriz vacía.
        # Luego insertamos inmediatamente el _id en la matriz.
        # Si el hashval ya existe, entonces
        # solo insertaremos el _id nuevo en la matriz existente.
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Realiza el bucle por todos los documentos del índice y completa
# la estructura de datos dict_of_duplicate_docs.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})
    # Obtén la ID de scroll.
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    # Antes del scroll, procesa el batch actual de resultados.
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        # Procesa el batch actual de resultados.
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        # Actualiza la ID de scroll.
        sid = data['_scroll_id']
        # Obtén la cantidad de resultados del último scroll.
        scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
    # Busca en el hash de valores de documentos para comprobar
    # si se encontraron hashes duplicados.
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Obtén los documentos mapeados al hashval actual.
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # En este ejemplo, simplemente imprimimos los documentos duplicados.
            # Este código podría modificarse fácilmente para eliminar los duplicados
            # aquí en lugar de imprimirlos.
            print("doc=%s\n" % doc)
def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()
main()

Conclusión

En este blog, mostramos dos métodos para la deduplicación de documentos en Elasticsearch. El primer método usa Logstash para eliminar los documentos duplicados, y el segundo usa un script personalizado de Python para encontrar y eliminar los documentos duplicados.

Si tienes alguna duda sobre la deduplicación de documentos de Elasticsearch o cualquier otro tema relacionado con Elasticsearch, echa un vistazo a nuestros foros de debate para acceder a conocimientos e información valiosos.