エンジニアリング

Elasticsearchでの重複ドキュメントの発見と削除方法

データをElasticsearchに送信している多くのシステムでは、 新たに挿入されるドキュメントに対してElasticsearchが自動で生成するID値を活用しています。しかし、データソースが誤って同じドキュメントを複数回Elasticsearchに送信し、それぞれのドキュメントにそのような自動生成の_id値が使用された場合、この同じドキュメントが複数回、異なる_id値でElasticsearchに保存されることになります。これが発生した場合は、そのような重複を見つけ、削除することが必要になる場合があります。そこでこのブログでは、(1)Logstashを使用して、または(2)Pythonで記述されたカスタムコードを使用して、Elasticsearchの重複ドキュメントを検出し、削除する方法について説明します。

ドキュメント構造の例

このブログ用として、Elasticsearchクラスター内のドキュメントが次のような構造を持っているとします。これは、株式市場の取引を表すドキュメントを含むデータセットに対応しています。

   {
      "_index": "stocks",
      "_type": "doc",
      "_id":"6fo3tmMB_ieLOlkwYclP",
      "_version":1,
      "found": true,
      "_source": {
        "CAC":1854.6,
        "host":"Alexanders-MBP",
        "SMI":2061.7,
        "@timestamp":"2017-01-09T02:30:00.000Z",
        "FTSE":2827.5,
        "DAX":1527.06,
        "time":"1483929000",
        "message":"1483929000,1527.06,2061.7,1854.6,2827.5\r",
        "@version":"1"
      }
    }

このブログ用として、このドキュメント構造の例を使用します。そして、複数のドキュメントで["CAC", "FTSE", "SMI"]フィールドの値が同じ(お互いに重複している)場合を想定します。

Logstashを使用してElasticsearchドキュメントの重複を排除

Logstashを使用して、重複しているドキュメントを検出し、Elasticsearchインデックスから削除することができます。そのテクニックについては、Logstashで重複に対処する方法に関するこちらのブログで説明しているため、このセクションではそのアプローチに該当する具体例を説明することにします。

下記の例は、私が記述したシンプルなLogstash構成です。これは、Elasticsearchクラスターのインデックスからドキュメントを読み取り、そしてフィンガープリントフィルターを使用して、["CAC", "FTSE", "SMI"]フィールドのハッシュに基づいて各ドキュメントの一意の_id値を計算し、最後に、各ドキュメントを同じElasticsearchクラスターの新しいインデックスに書き込みます。これにより、重複したドキュメントが同一の_id値となり、重複が排除されます。

さらに、同じLogstashフィルターにいくつかの簡単な修正を加えることで、新たに作成されるインデックスに書き込まれる今後のドキュメントに適用することも可能です。これにより、ほぼリアルタイムで重複を回避することができます。これは、既存のインデックスからドキュメントを取得するのではなく、リアルタイムの入力ソースからドキュメントを受け入れるように入力セクションを変更(下記の例を参照)することで達成できます。

注意が必要なのは、カスタムの_id値(つまりElasticsearchで生成されていない_id)を使用する場合は、インデックス操作の書き込みパフォーマンスに影響が及ぶことです。

また、使用するハッシュアルゴリズムによっては、このアプローチでは理論的に、_id値に対してゼロ以外の数のハッシュ衝突が発生する可能性があることに注意が必要です。これによって理論的に、異なる2つのドキュメントが同一の_idがマッピングされ、どちらかのドキュメントが失われる可能性があります。ただし、実際のケースのほとんどにおいては、ハッシュ衝突の可能性は非常に低いものです。さまざまなハッシュ関数の詳細な分析についてはこのブログでは扱いませんが、フィンガープリントフィルターで使用されるハッシュ関数は、入力パフォーマンスとハッシュ衝突数に影響を与えるため、慎重に考慮するべきです。

フィンガープリントフィルターを使用して既存のインデックスの重複を排除する簡単なLogstash構成は、以下のようになります。

input {
  # Elasticsearchからすべてのドキュメントを読み込みます 
  elasticsearch {
    hosts => "localhost"
    index => "stocks"
    query => '{ "sort": [ "_doc" ] }'
  }
}
# このフィルターは2019年2月18日に更新されています
filter {
    fingerprint {
        key => "1234ABCD"
        method => "SHA256"
        source => ["CAC", "FTSE", "SMI"]
        target => "[@metadata][generated_id]"
        concatenate_sources => true # <-- 最初の記述日以降に追加された新しい行
    }
}
output {
    stdout { codec => dots }
    elasticsearch {
        index => "stocks_after_fingerprint"
        document_id => "%{[@metadata][generated_id]}"
    }
}

カスタムのPythonスクリプトを使用してElasticsearchドキュメントの重複を排除

メモリー効率の高いアプローチ

Logstashを使用しない場合は、カスタムのPythonスクリプトで効率的に重複の排除を達成できます。このアプローチでは、定義済みの["CAC", "FTSE", "SMI"]フィールドのハッシュを計算して、ドキュメントを一意に特定します。そして、そのハッシュをPythonの辞書のキーとして使用します。辞書の各エントリーに関連付けられた値は、同じハッシュにマッピングされるドキュメントの_idsの配列となります。

2つ以上のドキュメントが同一のハッシュを持っている場合、同じハッシュにマッピングされた重複ドキュメントは削除可能です。または、ハッシュの衝突の可能性に懸念がある場合は、同じハッシュにマッピングされているドキュメントのコンテンツを精査して、それらのドキュメントが本当に同一かどうかを確認し、同一の場合は重複を排除することができます。

検出アルゴリズム分析

50 GBのインデックスの場合、そのインデックスに平均0.4 kBのサイズのドキュメントが含まれていると仮定すると、そのインデックスには1億2,500万件のドキュメントが含まれていることになります。128ビットのmd5ハッシュを使用した場合、重複を排除したデータ構造を保存するのに必要なメモリー量はおよそ128ビットx 1億2,500万 = 2 GBのメモリーとなり、さらに160ビットの_idsには別途、160ビットx1億2,500万 = 2.5 GBのメモリーが必要になります。したがってこのアルゴリズムでは、関連するすべてのデータ構造をメモリーに保存するために、およそ4.5 GBのRAMが必要になります。このメモリーフットプリントは、次のセクションで説明するアプローチを適用すれば大幅に削減できます。

アルゴリズムの強化

このセクションでは、メモリー使用量を減らし、新たな重複ドキュメントを継続的に削除するためのアルゴリズムの強化について説明します。

時系列データを保存していて、重複するドキュメントが互いにわずかな時間内でのみ発生することが分かっている場合、インデックス内のドキュメントのサブセット(異なる期間に対応しているサブセット)に対してアルゴリズムを順次実行していくことで、このアルゴリズムのメモリーフットプリントを改善できる場合があります。たとえば、1年間のデータを保有している場合、日時フィールドに対して範囲クエリーを使用(最良のパフォーマンスを得るためにはフィルターコンテキスト内で使用)して、 一度に1週間ずつのデータセットで実行します。各週に対して1度実行すると、このアルゴリズムの実行は52回必要になります。この場合、このアプローチによってメモリーフットプリントは最低でも52分の1に削減されます。

上記の例では、1週間を超える間隔で発生した重複ドキュメントを検出できないと心配する方もいるでしょう。そこで、2時間以上の間隔でドキュメントの重複が発生しないことが分かっている場合を考えてみます。この場合、アルゴリズムの各実行は、前回の実行によって分析された最後のドキュメントセットから2時間分のドキュメントを含むようにして実行する必要があります。したがって週ごとの場合は、重複ドキュメントを確実に確認するために、170時間(1週間プラス2時間)分の時系列ドキュメントに対してクエリーを実行する必要があります。

インデックスから定期的かつ継続的に重複ドキュメントを削除する場合は、最近受領したドキュメントに対してこのアルゴリズムを実行します。上記と同じ論理が当てはまります。つまり、最近受領したドキュメントと、上記のように対象期間を十分に取ってその期間内の古いドキュメントを含めて分析し、重複をうっかり見逃さないようにすることが重要です。

重複ドキュメントを検出するためのPythonコード

以下のコードは、ドキュメントが同一かどうかを効率的に評価し、必要な場合にはその重複を排除する方法です。ただし、ドキュメントを誤って削除してしまうことを防ぐために、この例では実際に削除操作は実行しないようにしてあります。削除操作は簡単に含めることができます。

Elasticsearchからドキュメントの重複を排除するコードについては、GitHubでも見つけることができます。

#!/usr/local/bin/python3
import hashlib
from elasticsearch import Elasticsearch
es = Elasticsearch(["localhost:9200"])
dict_of_duplicate_docs = {}
# 次の行は、ドキュメントが重複しているかどうかを
判断するために使用するフィールドを定義します
keys_to_include_in_hash = ["CAC", "FTSE", "SMI"]
# 現在の検索/スクロールで返されたドキュメントを処理します
def populate_dict_of_duplicate_docs(hits):
    for item in hits:
        combined_key = ""
        for mykey in keys_to_include_in_hash:
            combined_key += str(item['_source'][mykey])
        _id = item["_id"]
        hashval = hashlib.md5(combined_key.encode('utf-8')).digest()
        # hashval(ハッシュ値)が新しい場合は、
        # dict_of_duplicate_docsに新しいキーを作成します
        # これには空の配列の値が割り当てられます
        # その直後に_idを配列にプッシュします
        # hashvalがすでに存在している場合は、
        # 新しい_idを既存の配列にプッシュします
        dict_of_duplicate_docs.setdefault(hashval, []).append(_id)
# インデックスのすべてのドキュメントに実行し、
# dict_of_duplicate_docsデータ構造を投入します
def scroll_over_all_docs():
    data = es.search(index="stocks", scroll='1m',  body={"query": {"match_all": {}}})
    # スクロールIDを取得します
    sid = data['_scroll_id']
    scroll_size = len(data['hits']['hits'])
    # スクロールする前に、現在のヒットのバッチを処理します
    populate_dict_of_duplicate_docs(data['hits']['hits'])
    while scroll_size > 0:
        data = es.scroll(scroll_id=sid, scroll='2m')
        # 現在のヒットのバッチを処理します
        populate_dict_of_duplicate_docs(data['hits']['hits'])
        # スクロールIDを更新します
        sid = data['_scroll_id']
        # 最後のスクロールで返された結果数を取得します
        scroll_size = len(data['hits']['hits'])
def loop_over_hashes_and_remove_duplicates():
    # ドキュメント値のハッシュを検索し、
    # 重複したハッシュが見つかったどうかを確認します
    for hashval, array_of_ids in dict_of_duplicate_docs.items():
      if len(array_of_ids) > 1:
        print("********** Duplicate docs hash=%s **********" % hashval)
        # 現在のhashvalにマッピングされたドキュメントを取得します
        matching_docs = es.mget(index="stocks", doc_type="doc", body={"ids": array_of_ids})
        for doc in matching_docs['docs']:
            # この例では重複ドキュメントの出力のみを行っています
            # このコードは、重複を出力するのではなく、
            削除するように簡単に修正できます
            print("doc=%s\n" % doc)
def main():
    scroll_over_all_docs()
    loop_over_hashes_and_remove_duplicates()
main()

まとめ

このブログでは、Elasticsearchでのドキュメントの重複を排除する方法を2つ説明しました。最初の方法ではLogstashを使用して重複ドキュメントを削除し、2つ目の方法ではカスタムのPythonスクリプトを使用して重複ドキュメントを見つけ、削除します。

Elasticsearchでのドキュメントの重複排除について、またはElasticsearch関連のトピックについてご質問がある場合は、ディスカッションフォーラムにアクセスし、重要なインサイトや情報を入手してください。