엔지니어링

Elasticsearch에서 중복 문서를 찾아 제거하는 방법

데이터를 Elasticsearch로 보내는 많은 시스템에서는 새로 삽입된 문서에 Elasticsearch의 자동 생성된 id 값을 활용합니다. 그러나 실수로 데이터 소스에서 동일한 문서를 여러 번 Elasticsearch로 전송하는 경우 그리고 이렇게 자동 생성된 _id 값을 Elasticsearch가 삽입하는 각 문서에 사용하는 경우 다른 _id 값을 가진 동일한 문서가 Elasticsearch에 여러 번 저장됩니다. 이 경우 이러한 중복 문서를 찾아서 제거해야 할 수 있습니다. 따라서 이 블로그 게시물에서는 (1) Logstash를 사용하거나 (2) Python으로 작성된 사용자 정의 코드를 사용하여 Elasticsearch에서 중복 문서를 탐지하고 제거하는 방법을 다룹니다.

문서 구조 예제

이 블로그 게시물의 목적상, Elasticsearch 클러스터에 있는 문서들의 구조가 다음과 같다고 가정합니다. 이는 주식 시장 거래를 나타내는 문서가 포함된 데이터 세트에 해당합니다.

   {
      "_index": "stocks",
      "_type": "doc",
      "_id": "6fo3tmMB_ieLOlkwYclP",
      "_version": 1,
      "found": true,
      "_source": {
        "CAC": 1854.6,
        "host": "Alexanders-MBP",
        "SMI": 2061.7,
        "@timestamp": "2017-01-09T02:30:00.000Z",
        "FTSE": 2827.5,
        "DAX": 1527.06,
        "time": "1483929000",
        "message": "1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version": "1"
      }
    }

이 문서 구조 예제를 고려할 때, 본 블로그의 목적상, ["CAC", "FTSE", "SMI"] 필드의 값이 동일한 문서가 여러 개인 경우, 서로 중복되는 것으로 임의로 가정합니다.

Logstash를 사용하여 Elasticsearch에서 중복 문서 제거

Elasticsearch 인덱스에서 중복 문서를 탐지하고 제거하는 데 Logstash를 사용할 수 있습니다. 이 기법은 Logstash를 사용한 중복 처리에 대한 이 블로그 게시물에 설명되어 있으며, 이 섹션에서는 이러한 접근 방식을 적용하는 구체적인 예를 보여드리겠습니다.

Elasticsearch 클러스터의 인덱스에서 문서를 읽은 다음, 지문 필터를 사용하여 ["CAC", "FTSE", "SMI"] 필드의 해시를 기반으로 각 문서에 대한 고유한 _id 값을 계산한 후, 마지막으로 각 문서를 동일한 Elasticsearch 클러스터의 새 인덱스에 다시 써서 중복된 문서가 동일한 _id에 작성되어 제거되도록 하는 간단한 Logstash 구성을 아래 예제로 작성했습니다.

또한 이를 약간만 수정하면 새로 생성된 인덱스에 작성되는 향후 문서에 동일한 Logstash 필터를 적용하여 거의 실시간으로 중복이 제거되도록 할 수도 있습니다. 기존 인덱스에서 문서를 끌어오는 대신 실시간 입력 소스에서 문서를 받도록 아래 예제의 입력 섹션을 변경하면 됩니다.

사용자 정의 _id 값(즉, Elasticsearch에서 생성하지 않은 _id)을 사용하면 인덱스 작업의 쓰기 성능에 어느 정도 영향을 미칩니다.

또한 이 접근 방식을 사용하면 사용된 해시 알고리즘에 따라 이론적으로 _id 값에 대한 해시 충돌 횟수가 0이 안될 수 있으므로, 이론적으로 두 개의 동일하지 않은 문서가 동일한 _id에 매핑되어 이러한 문서 중 하나가 손실될 수 있습니다. 대부분의 실제 사례에서는 해시 충돌 가능성이 매우 낮습니다. 서로 다른 해시 함수에 대한 상세한 분석은 이 블로그의 범위를 벗어나지만, 지문 필터에 사용되는 해시 함수는 수집 성능 및 해시 충돌 횟수에 영향을 미치기 때문에 신중하게 검토해야 합니다.

지문 필터를 사용하여 기존 인덱스에서 중복을 제거하기 위한 간단한 Logstash 구성은 다음과 같습니다.

input {
  # Elasticsearch의 모든 문서를 읽습니다. 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}
# 이 필터는 2019년 2월 18일에 업데이트되었습니다.
filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
        concatenate_sources => true # <-- 원래 게시 날짜 이후에 새로운 줄이 추가되었습니다.
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

Elasticsearch 중복 문서 제거를 위한 사용자 정의 Python 스크립트

메모리 효율적인 접근 방식

Logstash를 사용하지 않는 경우 사용자 정의 Python 스크립트를 사용하여 중복 문서를 효율적으로 제거할 수 있습니다. 이 접근 방식에서는 문서를 고유하게 식별하기 위해 정의한 ["CAC", "FTSE", "SMI"] 필드의 해시를 계산합니다. 그런 다음 이 해시를 Python 사전의 키로 사용합니다. 여기서 각 사전 항목의 관련 값은 동일한 해시에 매핑되는 문서의 _ids 어레이입니다.

둘 이상의 문서에 동일한 해시가 있는 경우 동일한 해시에 매핑되는 중복 문서를 삭제할 수 있습니다. 또는 해시 충돌 가능성이 우려되는 경우에는 동일한 해시에 매핑되는 문서의 내용을 검사하여 문서가 실제로 동일한지 여부를 확인할 수 있으며, 동일한 경우 중복 문서를 제거할 수 있습니다.

탐지 알고리즘 분석

인덱스가 50GB이며 평균 크기가 0.4KB인 문서가 포함되어 있다고 가정하면 인덱스에 있는 문서의 수는 1억 2천5백만 개가 됩니다. 이 경우 128비트 md5 해시를 사용할 때 중복 제거 데이터 구조를 메모리에 저장하는 데 필요한 메모리의 양은 128비트 x 1억 2천5백만 = 2GB이며, 160비트 _ids에는 160비트 x 1억 2천5백만 = 2.5GB의 메모리가 추가로 필요합니다. 따라서 이 알고리즘에서 모든 관련 데이터 구조를 메모리에 유지하려면 약 4.5GB의 RAM이 필요합니다. 다음 섹션에서 설명하는 접근 방식을 적용할 수 있다면 이 메모리 사용 공간을 크게 줄일 수 있습니다.

알고리즘 개선

이 섹션에서는 메모리 사용량을 줄이고 새로운 중복 문서를 지속적으로 제거할 수 있도록 알고리즘을 개선한 부분을 설명합니다.

시계열 데이터를 저장하는 경우 중복 문서가 서로 일정 시간 내에만 발생하는 것을 알고 있다면, 인덱스에 있는 문서의 하위 집합(다른 시간 창에 해당하는 각 하위 집합)에 대해 알고리즘을 반복적으로 실행하여 이 알고리즘의 메모리 사용 공간을 개선할 수 있습니다. 예를 들어 몇 년 분량의 데이터가 있는 경우 최적의 성능을 위해 필터 컨텍스트 내에 있는 날짜 시간 필드에 범위 쿼리를 사용하여 한 번에 1주일씩 단계별로 데이터 세트를 살펴볼 수 있습니다. 이렇게 하려면 알고리즘이 52회(매주 1회) 실행되어야 합니다. 이 경우 최악의 메모리 사용 공간의 1/52로 줄일 수 있습니다.

위의 예에서는 몇 주에 걸친 중복 문서를 탐지하지 못할 수 있습니다. 간격이 2시간 이상 벌어지면 중복 문서가 발생할 수 없다는 것을 알고 있다고 가정해 보겠습니다. 그렇다면 알고리즘을 실행할 때마다 이전 알고리즘 실행으로 분석된 마지막 문서 세트와 2시간씩 겹치는 문서가 포함되도록 해야 합니다. 따라서 주간 예제에서는 중복 항목이 누락되지 않도록 170시간(1주 + 2시간) 분량의 시계열 문서를 쿼리해야 합니다.

계속해서 인덱스에서 중복 문서를 주기적으로 지우려면 최근에 받은 문서에 이 알고리즘을 실행하면 됩니다. 이때 위와 동일한 논리가 적용됩니다. 즉, 최근에 받은 문서가 약간 오래된 문서와 함께 분석에 포함되도록 하여 중복 문서가 실수로 누락되지 않도록 해야 합니다.

중복 문서를 탐지하는 Python 코드

다음 코드는 문서가 동일한지 여부를 효율적으로 평가한 후 원하는 경우 삭제할 수 있는 방법을 보여줍니다. 그러나 실수로 문서가 삭제되는 것을 방지하기 위해 이 예제에서 삭제 작업을 실제로 실행하지는 않습니다. 이러한 기능은 간단하게 추가할 수 있습니다.

Elasticsearch에서 중복 문서를 제거하는 코드는 github에서도 찾을 수 있습니다.

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# 다음 줄은 문서의 중복 여부를
# 판단하는 데 사용될 필드를 정의합니다.
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
# 현재 검색/스크롤에 의해 반환된 문서를 처리합니다.
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        # hashval이 새로운 것이라면
        # dict_of_duplicate_docs에 새 키를 생성하며
        # 이 키에는 빈 어레이의 값이 할당됩니다.
        # 그런 다음 _id를 즉시 어레이로 푸시합니다.
        # hashval이 이미 존재한다면
        # 새로운 _id를 기존 어레이로 푸시합니다.
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# 인덱스의 모든 문서를 반복하고
# dict_of_duplicate_docs 데이터 구조를 채웁니다.
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})
    # 스크롤 ID를 가져옵니다.
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    # 스크롤하기 전에 적중 결과의 현재 배치를 처리합니다.
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        # 적중 결과의 현재 배치를 처리합니다.
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        # 스크롤 ID를 업데이트합니다.
        sid = data['_scroll_id']
        # 마지막 스크롤에 반환된 결과 수를 가져옵니다.
        scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
    # 중복 해시가 있는지 확인하기 위해
    # 문서 값의 해시를 검색합니다.
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # 현재 hasval에 매핑된 문서를 가져옵니다.
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # 이 예제에서는 중복 문서만 인쇄합니다.
            # 이 코드는 인쇄하는 대신 여기서 중복을
            # 삭제하도록 간단하게 수정할 수 있습니다.
            print("doc=%s\n" % doc)
def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()
main()

결론

이 블로그 게시물에서는 Elasticsearch에서 중복 문서를 제거할 수 있는 두 가지 방법을 보여드렸습니다. 첫 번째 방법은 Logstash를 사용하여 중복 문서를 제거하는 것이고, 두 번째 방법은 사용자 정의 Python 스크립트를 사용하여 중복 문서를 찾아 제거하는 것입니다.

Elasticsearch에서 중복 문서 제거 또는 기타 Elasticsearch 관련 주제에 대한 질문이 있으시면 토론 포럼을 통해 귀중한 통찰력과 정보를 얻으시기 바랍니다.