Reindexが登場!
_reindexと_update_by_queryが、Elasticsearch 2.3.0と5.0.0-alpha1に登場しました!
_reindexは、ドキュメントをひとつのindexから読み込み、他のindexに書き込みます。これにより、ドキュメントをあるindexから他のindexにコピーしたり、あらたな情報をfieldとして加えたり、index作成時に決められた設定を、indexの再作成により変更できます。
_update_by_queryは、ドキュメントをひとつのindexから読み取り、同じindexに書き戻します。これにより多くのドキュメントに含まれるfieldの更新や、mappingの変更をオンラインで行えます。
_reindex はドキュメントをコピー
この_reindex APIは、ドキュメントをあるindexから他にコピーするだけの便利な機能です。他にできることは全て副次的なものです。もし、あなたが全ドキュメントを、src indexから、dest indexにコピーするのであれば、_reindexを以下のとおり実行するだけです:
curl -XPOST localhost:9200/_reindex?pretty -d'{
"source": {
"index": "src"
},
"dest": {
"index": "dest"
}
}'
もしある条件に一致するものだけをコピー、例えば、bananasとタグ付けされたドキュメントだけであれば、以下のように_reindexを呼び出します:
curl -XPOST localhost:9200/_reindex?pretty -d'{
"source": {
"index": "src",
"query": {
"match": {
"tags": "bananas"
}
}
},
"dest": {
"index": "dest"
}
}'
bananasとタグ付けされたドキュメントをコピーし、さらに全てのドキュメントにchocolateタグを追加するには、以下のように_reindexを呼び出します:
curl -XPOST localhost:9200/_reindex?pretty -d'{
"source": {
"index": "src",
"query": {
"match": {
"tags": "bananas"
}
}
},
"dest": {
"index": "dest"
},
"script": {
"inline": "ctx._source.tags += \"chocolate\""
}
}'
Dynamic scriptが有効になっている必要がありますが、同じことがnon-inlineスクリプトでも可能です。
index作成時に決めた設定を変更するためにindexを再作成する場合には、多少の作業が必要ですが、以前よりは簡単になりました。
# このように古いindexを作成
curl -XPUT localhost:9200/test_1 -d'{
"aliases": {
"test": {}
}
}'
for i in $(seq 1 1000); do
curl -XPOST localhost:9200/test/test -d'{"tags": ["bananas"]}'
echo
done
curl -XPOST localhost:9200/test/_refresh?pretty
# 初期値のshard数が気に入らなかったので
# 新しいshard数でコピーを作成
curl -XPUT localhost:9200/test_2 -d'{
"settings": {
"number_of_shards": 1
}
}'
curl -XPOST 'localhost:9200/_reindex?pretty&refresh' -d'{
"source": {
"index": "test"
},
"dest": {
"index": "test_2"
}
}'
# aliasを新しいindexへ
curl -XPOST localhost:9200/_aliases?pretty -d'{
"actions": [
{ "remove": { "index": "test_1", "alias": "index" } },
{ "add": { "index": "test_2", "alias": "index" } }
]
}'
# 最後に古いindexを削除
curl -XDELETE localhost:9200/test_1?pretty
_update_by_queryはドキュメントを変更
それほど便利というわけではありませんが、一番簡単な_update_by_queryの使用法です。
curl -XPOST localhost:9200/test/_update_by_query?pretty
これは、test indexのドキュメントのバージョン番号を増加させ、この処理中にドキュメントを更新しようとすると、失敗します。
もう少し興味深い例は、bananasタグのあるドキュメント全てに、chocolateタグを追加する例です。
curl -XPOST 'localhost:9200/test/_update_by_query?pretty&refresh' -d'{
"query": {
"bool": {
"must": [ {"match": {"tags": "bananas"}} ],
"must_not": [ {"match": {"tags": "chocolate"}} ]
}
},
"script": {
"inline": "ctx._source.tags += \"chocolate\""
}
}'
先ほどと同じように、この処理中にドキュメントを更新しようとすると、失敗しますが、そのような場合には、失敗した場所からリトライするような機能もあります。もしあなたが、bananasタグがあった場合にchocolateタグを追加し、同時にアップデートするようなアプリケーションを作っている場合、_update_by_queryでバージョンの矛盾を安全に無視できます。conflicts=proceedと設定します。これはバージョンの矛盾を数え、アップデートを継続します。このコマンドは以下のとおりです:
curl -XPOST 'localhost:9200/test/_update_by_query?pretty&refresh&conflicts=proceed' -d'{
"query": {
"bool": {
"must": [ {"match": {"tags": "bananas"}} ],
"must_not": [ {"match": {"tags": "chocolate"}} ]
}
},
"script": {
"inline": "ctx._source.tags += \"chocolate\""
}
}'
また、既存のfieldをもとに、異なるmappingのfieldを追加する場合にも、_update_by_queryが使えます。
# tagsはnot_analyzedに
curl -XPUT localhost:9200/test_3?pretty -d'{
"mappings": {
"test": {
"properties": {
"tags": {
"type": "string",
"index": "not_analyzed"
}
}
}
}
}'
for i in $(seq 1 1000); do
curl -XPOST localhost:9200/test_3/test -d'{"tags": ["bananas"]}'
echo
done
curl -XPOST localhost:9200/test_3/_refresh?pretty
# しかし、standard analyzerを使って、bananaとというキーワードを使ってbananasを見つけるために、tagsを検索したくなりました
curl -XPUT localhost:9200/test_3/_mapping/test?pretty -d'{
"properties": {
"tags": {
"type": "string",
"index": "not_analyzed",
"fields": {
"analyzed": {
"type": "string",
"analyzer": "standard"
}
}
}
}
}'
# これはすぐには有効になりません
curl 'localhost:9200/test_3/_search?pretty' -d'{
"query": {
"match": {
"tags.analyzed": "bananas"
}
}
}'
# :(
# しかし、_update_by_query を使って、新しいmappingで全てのドキュメントを更新します
curl -XPOST 'localhost:9200/test_3/_update_by_query?pretty&conflicts=proceed&refresh'
# 新しいmappingがindexに適用されました!
curl 'localhost:9200/test_3/_search?pretty' -d'{
"query": {
"match": {
"tags.analyzed": "bananas"
}
}
}'
ステータスの取得
_reindexと_update_by_queryは、数百万ものドキュメントを触る可能性がありますので、時間がかかることがあります。以下の方法でステータスを取得できます:
curl localhost:9200/_tasks?pretty&detailed&actions=*reindex,*byquery
これには、このようなfieldが含まれます:
"BHgHr0cETkOehwqZ2N_-aQ:28295" : {
"node" : "BHgHr0cETkOehwqZ2N_-aQ",
"id" : 28295,
"type" : "transport",
"action" : "indices:data/write/reindex",
"start_time_in_millis" : 1458767149108,
"running_time_in_nanos" : 5475314,
"status" : {
"total" : 6154,
"updated" : 3500,
"created" : 0,
"deleted" : 0,
"batches" : 36,
"version_conflicts" : 0,
"noops" : 0,
"retries": 0,
"throttled_millis": 0
}
}
これにより、_reindexは、totalの件数の実行を予定していて、updated + created + deleted + noopsの件数の実行が終了しているということがわかります(詳細はをドキュメント参照)。これらを割り算することにより、どの程度完了したか推測できます。
タスクの中止
Elasticsearchには、実行中のタスクを終了する機能がなかったので_reindexの提供には時間がかかりました。短い時間に実行が終わる_searchやindexの作成タスクについては、問題ないでしょう。しかし、前述のように、_reindexや、_update_by_queryは数百万件ものドキュメントを触る可能性があり、時間がかかります。タスクそのものは良いでしょうが、ユーザは困るでしょう。3時間もかかる_update_by_queryにおいて、10分後にスクリプトに誤りがあるのを見つけたとしたら、どうでしょうか。すでに再インデックスされた変更はロールバックできませんが、これ以上変更しないために、キャンセルはできます:
curl -XPOST localhost:9200/_task/{taskId}/_cancel
taskIdはどこから取得できるでしょうか。これは、先ほどの_taskが返すオブジェクトの名前です。この例ではBHgHr0cETkOehwqZ2N_-aQ:28295がそれに当たります。
Elasticsearch内でのタスク停止は、それぞれのタスクが判断を行っています。Javaアプリケーションであれば仕方がないことでもあります。それでも_reindexと_update_by_queryのタスクは、定期的に停止の必要があるか確認し、自身で停止します。ですので、タスクを停止した直後には、タスクのリストにまだ残っている可能性があります。それはそのうちなくなりますが、ノードを停止しない限り、明示的に停止できません。
Elasticsearchがサーチエンジンであることをお忘れなく
全てのドキュメントは削除とマークされてから、新しいドキュメントがインデックスされます。その後、削除とマークされたドキュメントは、マージ処理にてindexから削除されます。_reindexとupdate_by_queryも同様の処理が行われます。これらは、scrollクエリーを実行し、全ての結果をインデックスするのと同じ動作です。。無数の_reindexや_update_by_queryタスクを実行することは、コンピューターのリソースを有効に使うことにはなりません。Elasticsearchにデータを追加した後に変更するより、まずは、データを追加する段階で良いアプリケーションを作っていただくようお願いします。_reindexと_update_by_queryは、すでにElasticsearchに入っているデータを変更する場合に、便利に使ってもらえることでしょう。