エンジニアリング

Elasticsearch: 外部データストアでデータの整合性を検証

Elasticsearchは他のデータベースと併用されることがあります。関与するすべてのシステムでトランザクションがサポートされるわけではない場合、2フェーズコミット が絡んでくると厄介な問題が起きます。ユースケースによっては、片方がもう片方のデータストアに対していわゆる「信頼できる情報源」的な役割を果たす2つのデータストアにデータが実際に存在するかどうか確認する必要がある場合があります。

Elasticのサポートエンジニアには、どんな構造にすればデータが検証しやすくなるか、または検証そのものを効率的にやるにはどうしたらいいか、という相談が多く寄せられます。このブログ記事では、PostgreSQLなどのデータベースからの必要なデータがElasticsearchに含まれているかどうかを検証する方法を、これまで見てきた例や作成のお手伝いをさせていただいた例の中から、シンプルな例から複雑な例までご紹介したいと思います。

検証用にデータをモデル化する

Elasticsearchの内外でデータがどのように保存されているかによって、検証の難度は変わってきます。まずはどの程度検証が必要かを決める必要があります。

  • ドキュメントは存在していればそれだけで十分なのか?
  • または、内部の処理により、ドキュメント全体を検証する必要があるのか?

この判断には、検証の実行に必要な労力がどれぐらいかということも関係してきます。

存在の検証方法の発展

幸い、これはそれほど深い哲学的な質問ではありません。「いまドキュメントは全部Elasticsearch内にあるのか?」という単純な質問です。

何が起きているかをしっかり把握しておけば、Elasticsearchではさまざまな方法で存在を確認することができます。ここで、 Elasticsearchでの検索は、ほぼリアルタイムですが、ドキュメントは直接、リアルタイムで取得で実行できることを覚えておいてください。つまり、ドキュメントにインデキシングすると、検索には引っかかって来なくても、直接のGetリクエストでは取得できるわけです。

ドキュメント単位で確認する方法

小規模であれば、最も簡単なのは個々のドキュメントに対してHEAD リクエストを実行し、HTTP応答コードが404 (page not found、この場合はドキュメントですが) ではないことを確認する方法です。

HEAD /my_index/my_type/my_id1

レスポンスの一例として、Successというヘッダだけが戻ってきます:

HTTP/1.1 200 OK
Content-Type: text/plain; charset=UTF-8
Content-Length: 0

ドキュメントが存在しない場合もメッセージは、レスポンスコード以外は同じです:

HTTP/1.1 404 Not Found
Content-Type: text/plain; charset=UTF-8
Content-Length: 0

この場合は、N ドキュメントがインデックスにあることを確認するのに、N回のリクエストを実行する必要があります。お察しのとおり、この方法はあまり効率的ではありません。

バッチ処理する方法

次に考えられる方法は、複数のGET API: _mgetを使ってバッチリクエストを実行することです。

GET /my_index/my_type/_mget
{
  "ids": [ "my_id1", "my_id2" ]
}

もう1つの方法は、IDを検索して予想される数値を返すことです。

GET /my_index/_refresh
GET /my_index/my_type/_search
{
  "size": 2,
  "query": {
    "ids": {
      "values": [ "my_id1", "my_id2" ]
    }
  }
}

最初に_refreshエンドポイントを呼び出して、インデックスが設定された項目がすべて検索可能であるかどうかを確認します。これで検索の「ほぼリアルタイム検索」の側面を排除できます。通常の検索ではやってはならないことですが、ここでは重要なことです。おそらく確認処理は同時実行されるため、処理が開始された後に入ってきた新しいデータを確認しようとしなければ、ジョブの開始時に_refresh 1度だけ呼び出しておけば充分です。

この方法ではリクエストの処理は速いですが、相変わらずバックグラウンドでの処理を避けることはできません。つまり、ドキュメントは1つずつ返されるわけで、オーバーヘッドが増えることに変わりはありません。

サーチ、その後バッチ

バッチ処理の後は、個別に問題を解決しようとするのが普通です。まずは、検索を実行して何がないかを調べ、ないドキュメントを探しにかかります。

GET /my_index/_refresh
GET /my_index/my_type/_search
{
  "size": 0,
  "query": {
    "ids": {
      "values": [ "my_id1", "my_id2" ]
    }
  }
}

レスポンスのhits.total値は予想していたとおりの数であるはずです。この例の場合は、 2になるはずです。「2」以外であれば、方法を変えてやり直す必要があります:

  1. すべてのIDを検索し、適切なサイズを指定してそれらを取得して、干し草の山から針を探す。
  2. _mgetの例に戻って同じことをやる。つまり、見つからないIDがどれかを調べる。

もっと良い方法があるはず...

長く手順が多い処理や2段階処理は、大規模な展開では面倒なことになります。質問が高くつく可能性もある一方で、もしかしたら質問自体が間違っている可能性もあります。

クエリや質問の仕方を180度転換させて、「存在するものをすべて検索」ではなく、「ないものはどれ?」という効率的なクエリに変えたらどうなるでしょう?「ここにないもの」をクエリするのは難しいことで、実際、データが存在しなければクエリ自体が不可能なのです。ただし、もしも データをうまく構成できれば、aggregationを使って正解を導き出すことができます。

多くの検証のユースケースはSQLの環境から来ており、主キーと外部キーには整数ベースのキーが使われるのが一般的です。Elasticsearch _idに数値IDを使用していない場合でも、 doc valuesを有効 にして、主キーと外部キーは整数ベースの値としてインデキシングする必要があります (ES 2.x以上ではデフォルトで有効になっています)。たとえば:

POST /my_index/my_type/my_id1
{
  "id": 1,
  …
}

フィールド名は何でも構いませんが、 _id以外であることを確認する必要があります。_idは、Elasticsearchのメタデータフィールド用に予約された値です。 この値のインデキシングを開始すれば、 存在しない データの検索がhistogramで非常に簡単にできるようになります:

GET /my_index/my_type/_search
{
  "size": 0,
  "aggs": {
    "find_missing_ids": {
      "histogram": {
        "field": "id",
        "interval": 1,
        "min_doc_count": 0
      },
      "aggs": {
        "remove_existing_bucket_selector": {
          "bucket_selector": {
            "buckets_path": {
              "count": "_count"
            },
            "script": {
              "inline": "count == 0",
              "lang": "expression"
            }
          }
        }
      }
    }
  }
}

これで次の2つのことが実行されます:

  1. それぞれの値が1ずつの間隔になるように数値のidフィールドに対してhistogramを実行します。つまり、各id値の間にある整数ベースの値が検索されます (たとえば、1 - 5 は1、2、3、4、5というヒストグラムになります)。
  2. Elasticsearch 2.0以上でしか使用できないbucket selectorを使用して、実際にデータが含まれるhistogram bucketを削除します。

存在するものが削除されれば、存在しないものが残り、次のようになります:

{
  "took": 4,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "failed": 0
  },
  "hits": {
    "total": 5,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "find_missing_ids": {
      "buckets": [
        {
          "key": 4,
          "doc_count": 0
        }
      ]
    }
  }
}

5つのドキュメント (これはhits.totallを見ると分かりますが) が含まれる小さなインデックスから、id が「4」のドキュメントが存在しないことが分かります。

たったこれだけです。ドキュメントの一意なキーからヒストグラムを作成できれば、欠落しているデータのみを示すヒストグラムを作成できるのです。これは文字列に対しては使えません。つまり、 「my_id」 やUUID (GUIDなど) では、aggregationに用いた方法を使うことはできないのです。結局は、このタイプのチェックができるようにデータを組み立てる必要があります。

隙間に注意

シーケンシャルIDに馴染が少ない人にとって、(SQL データベースなどの) システムがIDを複数プリフェッチすることも珍しくありません。このような処理の場合、一部の値がスキップされている可能性があります。これには、トランザクションの失敗などの多くの理由が考えられます。また、後で明示的にレコードが削除されたなどのもっと単純な原因もありえます。

このような場合、ヒストグラムではスキップされた値は欠落しているドキュメントとして表示される、ということを覚えておく必要があります。クライアントとしてこれを回避するには、これらの idを明示的に除外するか、クライアントサイドで無視するような二次バケットを追加して、スキップされた値を無視するようにElasticsearchに指示することができます。

ドキュメント全体の検証

ドキュメント全体の検証となると、単純な存在確認とはまた話が違ってきます。この場合、期待されるデータがきちんと入っているかどうかを確認するためにドキュメント全体を検証する必要があります。Elasticsearchの観点から見ると、実際にはこっちの方が簡単なのですが、ユーザー側の作業はもっと難しくなります。

データのスクロール

このチェックを実行する最も簡単な方法は、データをスクロールして見ていくことです。とは言っても、HTMLテーブルにすべてのデータを貼り付けてマウスホイールで上から全部見ていくわけではありません。データ全体に _scrollAPIを繰返し使用します (以下で使用している_docについてはリンク先に説明があります)。 他のサーチAPIと同様に、このAPIも前述の「ほぼリアルタイム」の原則で動作します。

GET /my_index/my_type/_search?scroll=1m
{
  "sort": [
    "_doc"
  ]
}

スクロールする時間 =クライアントソフトウェアが次のバッチをリクエストするまでにバッチを処理するのに必要な時間です。以降の レスポンスをループし、リンクされたドキュメントを読んでいけるだけの時間を確保してください。

データをスクロールしていくことで、1つずつドキュメントをチェックし、データがすべて要件を満たしているかどうかを確認できます。

バージョン管理

Elasticsearchは 楽観的な同時実行制御をサポートしています。言い換えれば、バージョン管理です。

バージョン番号を指定してバージョンを完全に管理できます。 これによりバージョン番号を確認するだけで、ドキュメントの検証を回避できる可能性があります。検索応答にバージョン番号を使えるようにするには、バージョンフラグを指定する必要があります:

GET /my_index/my_type/_search
{
  "version": true
}

検証を回避する

データが信頼できるユーザーから提供されたもので、インジェスチョンが失敗した場合でも正しくエラーを処理していれば(たとえば、ドキュメント15123がインデキシングされるはずのときにElasticsearchがダウンしていた場合でも、何かを追加する必要があります。)、検証自体をやらなくて済む可能性もあります。このような場合は、検証は不要です。

5.0のX-Pack Security と以前のリリースのShield は、信頼されるユーザーにはアクセスを許可し、信頼されないユーザーはブロックするという、一定のセキュリティを提供します。ただし、インジェスチョンの失敗は何がインジェスチョンを実行しているかによって対応が異なるため、人間が適切に処理しなければなりません。

まとめ

長いブログ記事を最後までお読みいただき、ありがとうございます!

データ検証の概要と、データの組み立て方によって検証が劇的に簡略化できるという説明をさせていただきました。こういうアプローチがあると知ったことで、クエリやデータへの考え方が変わり、他の問題を解決する上でaggregationやX-Pack Securityなどの便利な機能以外にも何か方法があるのではないか、と考えていただけるようになると思います。

私たちは常に、興味深い問題を解決するための独創的な方法を模索しており、これも例外ではありません。いつものように、ユーザーのみなさんにはぜひフォーラムでこれらの問題についてディスカッションしていただきたいと思います。また、今後何か問題に遭遇したら、GitHubのIssueとして登録してください。 Twitter (私のアカウントは@pickypg です) や IRCでも問題を提議していただけます。