2016年06月22日 エンジニアリング

Elastic Stackにちょうど良いKafka パート2

By Suyog Rao

前篇に引き続き、Apache KafkaとElastic Stack についての続編をお届けします。前回は、Elastic StackとKafkaの併用ユースケースを紹介し、時間とユーザーに基づくデータフローを考慮したシステムの設計について説明させていただきました。今回の記事では運用面、つまり、KafkaとLogstashを本番環境で実行して膨大なデータをインジェスチョンする方法を説明したいと思います。

キャパシティプランニング

細かい説明に入る前に申し上げておきますが、今回の説明では現行の安定バージョンであるKafka 0.8とLogstash 2.xを前提としています。Kafkaにはもっと新しいバージョンの0.9もあり、最近になって0.10もリリースされています。ここで説明する基本概念はKafkaのどのバージョンにも適用できます。前置きはこれぐらいにして、さまざまなシステムの説明に入りたいと思います。

Apache ZooKeeper: クラスタを構成するにはブローカーにZooKeeperが必要であったり、トピック設定がZKノードに保存されたりするため、Kafkaには ZooKeeper(ZK)が必要です。また、Logstashのバージョン2.xでは、入力オフセットがZKに保存されます。Kafkaの新しいバージョンでは、クライアントがコンシューマーとプロデューサーとしてZooKeeperとのコミュニケーションから切り離されています。Kafka 0.9と0.10では、オフセットはデフォルトでZKではなく、トピックに格納されます。いずれの場合も、Kafkaブローカーを実行するためにZooKeeperが必要になります。3つのZKインスタンスをすべて別のハードウェアで実行し、クォーラム構成を実現することをお勧めします。ZKの運用についての詳細は、Kafkaのドキュメントの中でも特にすばらしいドキュメントを参照してください。経験から言うと、ZK自体はいったんセットアップしてしまえばそれほど手はかかりません。インスタンスが立ち上がって正常に稼働していることをモニターする必要があるだけです。

Kafka Brokers: Kafkaブローカーがいくつ必要になるかは、データリテンションとレプリケーションの計画によって決まります。ブローカーを追加すればするほど、Kafkaに格納できるデータは多くなります。リソースに関して言えば、KafkaはIOバウンドです。パフォーマンスは、ディスクの速度とファイルシステムのキャッシュによって制限されます。つまり、速いSSDドライブとファイルシステムキャッシュがあれば、1秒間に何百万ものメッセージを楽々処理できます。これらの情報は、topbeat を使用してモニターできます。

Logstash: それではKafkaに格納されたデータを処理するのに必要なLogstashインスタンスはどれぐらいなのか?という問題ですが、この数字はそう簡単には決まりません。フィルタはいくつあるのか、フィルタのコストはどれぐらいなのかなど、多くの可変条件が絡んでくるからです。結局、データを処理するために、予想されるデータ量はどれぐらいか、アウトプットは何かなど、多数の条件が含まれる複雑なGrokパターンを使わざるを得なくなることになりかねません。このように、必要なインスタンス数を見極めるためには、多くの情報が必要になります。キャパシティプランニングでは、Logstash自体ではなく、アウトプット(外部システム)に注目する必要があることが多いのです。ともあれ、LogstashとElasticsearchは簡単に水平的にスケーリングできますから、まずは小規模から始めて、データの増加に応じて、徐々にノードや新しいLSインスタンスを追加していくことをお勧めします。

特に、Logstashが処理するKafka内のデータについては、複数のインスタンスをコンシューマーグループに分けることができます。各グループは負荷を共有し、インスタンスは排他的にデータを処理します。つまり、メッセージはグループ内の1つのクライアントによって1回だけ消費されます。このような設計ゆえに、小規模で始めて段階的にスケーリングしていく、という方法がスムーズに実現するわけです。トピックを使用すれば、より複雑な変換が必要なデータや、もっと遅いアウトプットに格納する必要があるデータを他の高速で動くデータから切り離したワークフローを設計することができます。Logstashでは、1つの遅いアウトプットがあると、その後に実行するように設定されている他のアウトプットがブロックされることがあるので注意が必要です。

Elasticsearch: 前述のように、Elasticsearchは極めて柔軟で、簡単に拡張できます。Elasticsearchのキャパシティプランニングについて説明するとなると、別枠のブログ記事が必要になりますから、今回の記事では取り上げません。以下のElasticsearchのスケーリングとサイジングについての記事 -- ElasticsearchのサイジングElasticsearchのインデキシングその他に関するパフォーマンスの考慮事項をご覧いただくことをお勧めします。

データリテンション

Kafkaインスタンスのディスクスペースが不足する場合は、Kafkaログのリテンション時間の設定が高すぎる可能性があります。Kafkaでは、log.retention.bytes とlog.retention.hours のブローカー設定を使用して、経過時間とサイズという2つの条件に基づいてデータリテンションを設定することができます。いずれかの条件が満たされると、Logstashがメッセージを消費したかどうかに関係なく、Kafkaブローカーは一番古いものから順にメッセージの削除を開始します。

Elasticsearch用のデータリカバリやリテンションを設計するときには、Kafkaのリテンションツールを使用してみたくなると思いますが、経験から言うと、Elasticsearchの時間ベースのインデックスの管理には、Curator のようなツールを使うのがベストです。重大な障害が発生した場合にインデックスを復元するためのスナップショットの設定についても同じです。 Kafkaに格納されているデータは未処理でフィルタ処理もされていないコンテンツが多く、宛先も複数であるため、1つのダウンストリームコンポーネントにタイトに結合するのは避けた方が無難です。

オフセット管理とメッセージ配信の保証

Kafkaのドキュメントからの抜粋:

パーティション内のメッセージには、パーティション内での各メッセージを一意に識別する「オフセット」と呼ばれるID番号が順に割り当てられます。オフセットはコンシューマーによって管理されます。通常、コンシューマーは、メッセージを読み取るごとにオフセット番号を1ずつ増やしていきます。

Kafkaのインプットでは、 ZooKeeperを使用してオフセット情報が記録されます。Logstashがトピックからメッセージを取り出して処理すると、ZKに定期的にコミットされます。このプロセスは「チェックポイント」または「コミット」と呼ばれます。デフォルトでは、Logstashは1分ごとにZKに対してチェックポイントを実行します。チェックポイントの間隔は、auto_commit_interval_ms 設定を使用して変更できます。間隔を長く設定すると、Logstash が強制停止されたり、プロセスがクラッシュしたときに、失われるデータも多くなるので注意が必要です。他方、間隔を短く設定すると、クライアント当たりの書き込み時間が長くなり、ZKクラスタの負担が過剰になる可能性があります。

Logstashを再起動すると、まずはZKに格納されたオフセット情報が読み込まれ、直前のコミットポイントからメッセージの取得が開始されます。Kafkaは、at-least-once セマンティクスに従うように設計されています。つまり、メッセージは失われることはなく、再配信が可能です。これは、オフセットはまだメモリに保存されているがコミットはされていない間にLogstashがクラッシュする場合があることを意味します。すなわち、メッセージが再配信される可能性があり、それはメッセージが重複する可能性がある ことを意味します。メッセージの重複配信を避けたい場合は、メッセージのフィールドに一意なIDを作成/使用することができます。自作のコードで一意なIDを生成する場合も、Logstashのuuid フィルタを使用してIDを生成する場合も、Kafkaにメッセージが入る前に行う必要があります。Logstashの「送信」側では、このイベントIDを、Elasticsearchアウトプットプラグインのdocument_id オプションにマッピングすることができます。これで、同じIDを持つインデックス付きドキュメントがElasticsearchによって上書きされることになり、同じ内容のドキュメントが複数できてしまうことはありません。

万が一、ダウンストリームのデータが失われてコンテンツを再生する必要が生じた場合にもこの仕組みは便利です。別のコンシューマーグループを使用して、そのグループのペースでデータを再生できるわけです。

input {
  kafka {
    zk_connect => "kafka:2181"
    group_id => "logstash"
    topic_id => "apache_logs"
    consumer_threads => 16
  }
}
....
output {
  elasticsearch {
    document_id => "%{my_uuid}"
  }
}

ここで my_uuid は、イベントの既存フィールドです。

モニタリング: 遅れがあるかどうかを判断する方法

Kafkaの使用時にモニターする必要がある重要な項目の1つが、バックアップされ、Logstashによって消費されるのを待っているメッセージがどれぐらいあるかです。この情報をモニターするためのツールは数多くあります。たとえば以下のようなオプションがあります:

KafaにバンドルされたCLIツール

オフセットをチェックするための簡単なコマンドラインツールです。cronjobで定期的に実行して、好きなアラートソフトウェアを使ってアラートを生成できます。

/usr/bin/kafka-consumer-offset-checker --group logstash --topic apache_logs --zookeeper localhost:2181

応答例:

Group    Topic       Pid Offset  logSize Lag     Owner
logstash apache_logs 0   145833  300000  154167  none
logstash apache_logs 1   145720  300000  154280  none
logstash apache_logs 2   145799  300000  154201  none
logstash apache_logs 3   146267  300000  153733  none

Lag列には、処理が遅れているメッセージの数が表示されます。

JMX

Kafkaは、JConsoleを使用してJMX経由で簡単にモニターできます。JMXをアタッチしてLogstashをモニターするには、Logstashを開始する前に以下のようなJavaオプションを設定します:

export LS_JAVA_OPTS="
 -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.port=3000
 -Dcom.sun.management.jmxremote.ssl=false
 -Dcom.sun.management.jmxremote.authenticate=false"

AWSで実行している場合は、外部ホスト名またはサーバーのIPを必ず使用してください。

-Djava.rmi.server.hostname=ec2-107-X-X-X.compute-1.amazonaws.com

Elastic Stack

もちろん、Elastic Stack自体を使ってKafkaをモニターすることもできます。当然、これがイチオシの方法です。この場合には、この目的用に作成された Beat であるKafkabeatを使用します。Kafkabeatは Dale McDiarmidが作成したものです。 このBeatはオフセットとその他のトピック情報を収集し、 Elasticsearchに保存しますから、Kibanaを使ってコンシューマーの遅延を解析できます。ディスクスループットや、CPU、メモリなどのシステムレベルのステータス情報を収集するtopbeat と併用すれば、Kafkaをモニターするための強力なソリューションになります。これですべてのデータを1か所に集めることができるのですから、旧式のディスクを新しいSSDに入れ替えましょう!と上司を説得することもできます。さらに、5.0.0では、アプリケーションやシステムレベルでのモニター情報がMetricbeatと呼ばれる1つのBeatにまとめられるのです。すごいでしょう?

それでは、Kafkabeatに話を戻しましょう。まずは次の手順を実行します:

  1. https://github.com/gingerwizard/kafkabeatをクローンします。
  2. kafkabeatディレクトリ内でmakeを実行します。
  3. KafkabeatをKafkaブローカーに展開し、次のとおり実行します: ./kafkabeat -c kafkabeat.yml

これで、このブローカーとインデックスのすべてのKafkaトピックからオフセット情報が収集され、次のドキュメント構造でElasticsearchにまとめられます。

"@timestamp": "2016-06-22T01:00:43.033Z",
  "beat": {
    "hostname": "Suyogs-MBP-2",
    "name": "Suyogs-MBP-2"
  },
  "type": "consumer",
  "partition": 0,
  "topic": "apache_logs_test",
  "group": "logstash",
  "offset": 3245
  "lag": 60235
}

データをElasticsearchに取り込んだら、後は簡単にKibanaで表示できます。私は最新の Kibana 5.0.0-alpha3 を使用してダッシュボードを作成し、タイムスタンプに基づくコンシューマーのlag フィールドをグラフ表示しています。

Kibana_Kafka2.png 

Kafka Manager

これはオープンソースの UIツール で、Kafka全体を管理することができます。 トピックの作成、メトリクスの追跡記録、オフセットの管理などを実行できます。このツールはコンパイルとビルドまでに時間を要しますが、Kafkaの総合的な管理ソリューションが必要なら、ビルドする価値はあります。 Kafka Managerツールを起動したら、指示に従ってZKインスタンスをポイントし、モニターする新しいクラスタを作成します。

コンシューマービュー

kafka UI.png

トピックビュー

Kafka UI 2.png

まとめ

この記事では、KafkaとLogstashを駆使して、複数のソースからElasticsearchへのデータインジェスチョンの方法を説明しました。さらに続きます!

昨年はKafkaの 0.9.0 バージョンがリリースされ、最近 0.10.0 がリリースされました。この最新リリースには、内蔵セキュリティや新しいコンシューマー実装、データクオータなど、新しい機能が満載されています。Logstashのインプットとアウトプットも更新され、これらの新しい機能をLogstashで利用できるようになっています!次回の記事では、Kafkaの新しい機能について取り上げ、特にKafkaとElastic Stackを使用した総合的なセキュリティについて説明する予定です。

お楽しみに!