Engenharia

Como encontrar e remover documentos duplicados no Elasticsearch

Muitos sistemas que direcionam dados para o Elasticsearch tiram proveito dos valores de ID gerados automaticamente do Elasticsearch para documentos recém-inseridos. No entanto, se a fonte de dados acidentalmente enviar o mesmo documento para o Elasticsearch várias vezes, e se esses valores de_id gerados automaticamente forem usados para cada documento que o Elasticsearch inserir, esse mesmo documento será armazenado várias vezes no Elasticsearch com diferentes valores de _id. Se isso ocorrer, poderá ser necessário encontrar e remover essa duplicidade. Portanto, neste post, mostraremos como detectar e remover documentos duplicados do Elasticsearch (1) usando o Logstash ou (2) usando código personalizado escrito em Python.

Exemplo de estrutura de documento

Para os fins deste post do blog, vamos supor que os documentos no cluster do Elasticsearch tenham a estrutura a seguir. Isto corresponde a um conjunto de dados contendo documentos que representam negociações no mercado de ações.

   {
      "_index": "stocks",
      "_type": "doc",
      "_id": "6fo3tmMB_ieLOlkwYclP",
      "_version": 1,
      "found": true,
      "_source": {
        "CAC": 1854.6,
        "host": "Alexanders-MBP",
        "SMI": 2061.7,
        "@timestamp": "2017-01-09T02:30:00.000Z",
        "FTSE": 2827.5,
        "DAX": 1527.06,
        "time": "1483929000",
        "message": "1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version": "1"
      }
    }

Dado este exemplo de estrutura de documento, para os fins deste blog, supomos arbitrariamente que, se vários documentos têm os mesmos valores para os campos ["CAC", "FTSE", "SMI"], eles são duplicados uns dos outros.

Uso do Logstash para deduplicação de documentos do Elasticsearch

O Logstash pode ser usado para detectar e remover documentos duplicados de um índice do Elasticsearch. A técnica é descrita neste blog sobre como lidar com duplicidade com o Logstash, e esta seção demonstra um exemplo concreto que aplica essa abordagem.

No exemplo abaixo, escrevi uma configuração simples do Logstash que lê documentos de um índice em um cluster do Elasticsearch e, em seguida, usa o filtro de impressão digital para calcular um valor exclusivo de _id para cada documento com base em um hash dos campos ["CAC", "FTSE", "SMI"]. Por fim, grava cada documento de volta em um novo índice no mesmo cluster do Elasticsearch, de forma que documentos duplicados sejam gravados no mesmo _id e, portanto, eliminados.

Além disso, com pequenas modificações, o mesmo filtro do Logstash também pode ser aplicado a documentos futuros gravados no índice recém-criado, a fim de garantir que as duplicidades sejam removidas quase em tempo real. Isso pode ser conseguido alterando a seção de entrada no exemplo abaixo para aceitar documentos da sua fonte de entrada em tempo real, em vez de extrair documentos de um índice existente.

Esteja ciente de que o uso de valores personalizados de _id (ou seja, um _id não gerado pelo Elasticsearch) terá algum impacto no desempenho de gravação das suas operações de índice.

Além disso, vale ressaltar que, dependendo do algoritmo de hash usado, essa abordagem pode teoricamente resultar em um número diferente de zero de colisões de hash para o valor do _id, o que teoricamente poderia resultar em dois documentos não idênticos sendo mapeados para o mesmo _id, causando a perda de um desses documentos. Na maioria dos casos práticos, a probabilidade de uma colisão de hash é muito baixa. Uma análise detalhada das diferentes funções de hash está além do escopo deste blog, mas a função de hash usada no filtro de impressão digital deve ser considerada com cuidado, pois terá um impacto no desempenho da ingestão e no número de colisões de hash.

Uma configuração simples do Logstash para desduplicar um índice existente usando o filtro de impressão digital é fornecida abaixo.

input {
  # Leia todos os documentos do Elasticsearch 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}
# Este filtro foi atualizado em 18 de fevereiro de 2019
filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
        concatenate_sources => true # <-- Nova linha adicionada desde a data original do post
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

Script Python personalizado para deduplicação de documentos do Elasticsearch

Uma abordagem com uso eficiente da memória

Se o Logstash não for usado, a deduplicação poderá ser realizada de forma eficiente com um script Python personalizado. Para essa abordagem, calculamos o hash dos campos ["CAC", "FTSE", "SMI"] que definimos para identificar um documento de forma exclusiva. Em seguida, usamos esse hash como uma chave em um dicionário Python, onde o valor associado de cada entrada do dicionário será uma matriz dos _ids dos documentos que são mapeados para o mesmo hash.

Se mais de um documento tiver o mesmo hash, os documentos duplicados mapeados para o mesmo hash poderão ser excluídos. Como alternativa, se você estiver preocupado com a possibilidade de colisões de hash, o conteúdo dos documentos que mapearem para o mesmo hash poderá ser examinado para verificação se os documentos são realmente idênticos e, nesse caso, então, as duplicidades poderão ser eliminadas.

Análise do algoritmo de detecção

Para um índice de 50 GB, se considerarmos que o índice contém documentos com tamanho médio de 0,4 KB, haveria 125 milhões de documentos. Nesse caso, a quantidade de memória necessária para armazenar as estruturas de dados de desduplicação na memória ao usar um hash md5 de 128 bits seria da ordem de 128 bits x 125 milhões = 2 GB de memória; além disso, os _ids de 160 bits exigirão outros 160 bits x 125 milhões = 2,5 GB de memória. Portanto, esse algoritmo exigirá 4,5 GB de RAM para manter todas as estruturas de dados relevantes na memória. Esse espaço de memória poderá ser reduzido drasticamente se a abordagem discutida na seção a seguir puder ser aplicada.

Melhoria no algoritmo

Nesta seção, apresentamos uma melhoria em nosso algoritmo para reduzir o uso de memória e remover continuamente novos documentos duplicados.

Se você estiver armazenando dados de séries temporais e souber que documentos duplicados ocorrerão apenas dentro de um breve espaço de tempo um do outro, poderá melhorar o espaço de memória desse algoritmo executando repetidamente o algoritmo em um subconjunto dos documentos no índice, com cada subconjunto correspondendo a um intervalo de tempo diferente. Por exemplo, se você tivesse um ano de dados, poderia usar consultas de intervalo no campo datetime (dentro de um contexto de filtro para obter o melhor desempenho) para percorrer seu conjunto de dados uma semana de cada vez. Isso exigiria que o algoritmo fosse executado 52 vezes (uma vez para cada semana) e, nesse caso, essa abordagem reduziria o espaço da memória do pior caso por um fator de 52.

No exemplo acima, talvez você esteja preocupado com o fato de não detectar documentos duplicados em um período de algumas semanas. Vamos supor que você saiba que documentos duplicados não podem ocorrer com mais de duas horas de intervalo. Em seguida, você precisaria garantir que cada execução do algoritmo inclua documentos que se sobreponham em duas horas ao último conjunto de documentos analisados pela execução anterior do algoritmo. Para o exemplo semanal, portanto, você precisaria consultar 170 horas (1 semana + 2 horas) de documentos de séries temporais para garantir que nenhuma duplicação seja perdida.

Se quiser fazer uma limpeza periódica de documentos duplicados dos seus índices de forma contínua, você poderá executar esse algoritmo em documentos recebidos recentemente. A mesma lógica se aplica como mencionado acima: verifique se os documentos recebidos recentemente estão incluídos na análise, juntamente com uma sobreposição suficiente a documentos um pouco mais antigos, para garantir que duplicatas não sejam perdidas inadvertidamente.

Código Python para detectar documentos duplicados

O código a seguir demonstra como os documentos podem ser avaliados com eficiência para verificar se são idênticos e depois ser eliminados, se desejado. No entanto, para evitar a exclusão acidental de documentos, neste exemplo, não executamos uma operação de exclusão. Essa funcionalidade poderia ser incluída facilmente.

O código para desduplicar documentos do Elasticsearch também pode ser encontrado no github.

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# A linha a seguir define os campos que serão
# usado para determinar se um documento é duplicado
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
# Processar documentos retornados pela pesquisa/scroll atual
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        # Se o hashval for novo, criaremos uma nova chave
        # no dict_of_duplicate_docs, ao qual será
        # atribuído o valor de uma matriz vazia.
        # Em seguida, colocamos o _id imediatamente na matriz.
        # Se o hashval já existir, então
        # apenas colocaremos o novo _id na matriz existente
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# Faça um loop em todos os documentos no índice e preencha a
# estrutura de dados do dict_of_duplicate_docs.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})
    # Obtenha o ID do scroll
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    # Antes do scroll, processe o lote atual de ocorrências
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        # Processe o lote atual de ocorrências
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        # Atualize o ID do scroll
        sid = data['_scroll_id']
        # Obtenha o número de resultados retornados no último scroll
        scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
    # Pesquise o hash dos valores dos documentos para ver se algum
    # hash duplicado foi encontrado
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # Obtenha os documentos que foram mapeados para o hashval atual
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # Neste exemplo, apenas imprimimos os documentos duplicados.
            # Este código poderia ser facilmente modificado para excluir duplicatas
            # aqui em vez de imprimi-las
            print("doc=%s\n" % doc)
def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()
main()

Conclusão

Neste post do blog, demonstramos dois métodos para deduplicação de documentos no Elasticsearch. O primeiro método usa o Logstash para remover documentos duplicados, e o segundo método usa um script Python personalizado para encontrar e remover documentos duplicados.

Se tiver alguma dúvida sobre deduplicação de documentos do Elasticsearch ou sobre qualquer outro tópico relacionado ao Elasticsearch, consulte os fóruns de discussão para obter valiosos insights e informações.