エンジニアリング

Elastic Stackにちょうど良いKafka パート1

The Elastic StackApache Kafkaは、ログ/イベント処理領域において緊密な関係があります。多くの企業が大量のデータを保存し、処理するためのトランスポートレイヤーとしてKafkaを使用しています。これまで見てきた多くの実際の展開では、KafkaはElasticsearchの高速な検索および分析能力を利用する前にデータを処理する重要な役割を担っていました。私たちは今回、ブログの連載記事を通じて、Elastic Stackと統合するときのKafkaのセットアップ方法および管理方法について明らかにします。具体的には、KafkaとLogstashで大量のデータを扱うときの経験について論じます。

注: 連載記事の目的上、Kafkaの0.8.xバージョンについて言及しています。Kafkaの最新版である0.9.xバージョンではいくつかの機能に変更がありますが、0.8.xは現在でも非常に人気があり、広く使用されています。

基本事項

まず、基本的なところから確認していきましょう。Kafkaのドキュメントには以下のように記載されています。

Apache Kafkaは、分散型コミットログとして再考されたパブリッシュ-サブスクライブ型メッセージ機能です

Kafkaは大量のイベントデータを扱うためにLinkedInで開発されました。他の多くのメッセージブローカーと同様に、Publisher/Consumer型メッセージやキューセマンティクスに対応するためにデータをTopicにグループ化しています。アプリケーションとしては、Topicに書き込み、Topicから消費します。Kafkaにおける重要な差別化ポイントまたは設計の違いは、複雑さがProducer側からConsumer側に移動していること、そしてファイルシステムキャッシュを多用していることが挙げられます。このような設計に関する意思決定と、一から分散されているという特徴を併せ持つことから、Kafkaは多くの大容量ストリーミングのユースケースで選ばれています。

Logstashは、JAVA APIにより、Kafkaとネイティブな統合が可能です。インプットアウトプット両方のプラグインが提供されているため、LogstashからKafkaに直接読み書きできます。使い始めるための設定は、極めてシンプルです。

kafka {
   zk_connect => "hostname:port"
   topic_id => "apache_logs"
   ...
}

KafkaにはApache ZooKeeperが必要になるため、Kafkaを実行する場合はZooKeeperクラスタへのアクセス権が必要です。これについては後述します。

KafkaをElastic Stackと併用するべき状況

シナリオ1:イベントスパイク

ログデータやイベントベースのデータの利用量やフローレートが、一貫した予測可能なものであることは稀です。アプリケーションを金曜の夜にアップグレードしたというシナリオを考えてみましょう(金曜日にアップグレードすべきではない理由については別のブログ記事で述べます)。展開しているアプリには過剰に情報を記録するという厄介なバグがあり、記録インフラがログで溢れてしまっています。たとえばゲーム業界やEコマース業界などの、ほかのマルチテナント型のユースケースでも、このようなスパイク(データのバースト)はよく起こります。このシナリオでは、Kafkaのようなメッセージブローカーを使用してLogstashやElasticsearchを利用量の増加から保護します。

このアーキテクチャでは、処理をShipperIndexerという2つの分離されたステージに分割することが一般的です。データを別のデータソースから受信するLogstashインスタンスは、ほとんど処理には関わらないのでShipperと呼ばれます。受信したデータを直ちにKafkaのTopicで存続させる役割を持つため、生産側に属します。一方、より強力なLogstashインスタンスは、Grok、DNSルックアップ、Elasticsearchへのインデックスのような高コストな変換を実行しながら、独自に調整された速度でデータを消費します。このインスタンスはIndexerと呼ばれます

Logstashは、これまでShipperとして使用されてきましたが、私たちは利用可能なElastic Beatsの製品スイートを特化型のShipperとして使用することを強く推奨します。たとえばFilebeatは、ファイルを追跡し、Logstashレシーバー経由でKafkaに出荷できる、軽量でリソースにやさしいエージェントです。

Kafka

注: 現時点では、FilebeatがKafkaに直接書き込むことはできませんが5.0.0(現在プレリリース状態)以降のバージョンでは、アウトプット先としてKafkaを設定できるようになります。この強化により、上記のようなBeatsを使用したデータ消費のユースケースのアーキテクチャはさらにシンプルになります。ぜひ、この機能をはじめとするアルファリリース状態にある新機能を試してご意見をお寄せください! 噂によれば、テストに協力するとElastic{ON}17のフリーパスがもらえるそうです!

シナリオ2:Elasticsearchに接続不可能

別のシナリオを考えてみましょう。マルチノードのElasticsearchクラスタを1.7から2.3にアップグレードしようと計画しているとします。これには、すべてのクラスタの再起動が必要です。あるいは、Elasticsearchのダウンタイムが予想より長い状況だとします。Elasticsearchに対してストリーミングを行っているデータソースが多数あり、元となるデータソースを停止することができない場合、Kafkaのようなメッセージブローカーが役に立ちます!LogstashのShipper/Indexer型アーキテクチャと併せてKafkaを利用している場合、エッジノードからのデータストリーミングを継続して、一時的にそのデータをKafkaで保持することができます。Elasticsearchが再起動したら、Logstashは中断したところから出荷を再開し、溜まっていたデータを埋め合わせることができます。実際、これは当社のソフトウェアの柔軟な性質をうまく利用しています。Logstashのインスタンスを追加することで、一時的に処理能力とインデックス能力を高め、前述のKafkaTopicからのデータを消費できます。さらに、Elasticsearchでノードを追加することもできます。多くのサポートを必要とせずにスケールアウト可能であることも、Elasticsearchのコア機能です。処理が追いついたら、元のインスタンス数にスケールダウンできます。

アンチパターン:KafkaをElastic Stackと併用するべきではない状況

Kafkaを使うべきではないタイミングは、Kafkaの使い時と同じくらい重要な知識です。ものにはすべてコストがあります。Kafkaもまた、稼働環境において手をかける必要のあるソフトウェアです。これには、監視、アラートへの対応、アップグレードなど、稼働環境で正常にソフトウェアを実行するために必要なすべてが含まれます。稼働しているソフトウェアはすべて監視しますよね?

一元化されたログ管理といえば、ログはできるだけ早くエッジノードから出荷されるべきである、ということがよく言われています。これは、一部のユースケースについては正しく述べていますが、自分にとってこれが本当に要件であるかを自問してみましょう! 適度な検索時のレイテンシを許容できるのであれば、Kafkaや後工程でファイルのコンテンツをエッジノードから出荷するFilebeatの使用を完全に省いても、すぐにログローテーションに復帰可能です。つまり、アプリケーションがLogstash/Elasticsearchがリアルタイムに消化可能な量を超えるログを排出しても、たとえばLog4jまたはlogrotate,を使用してファイル全体のログをローテーションできるということです。ただし、それでもインデックス化は必要です。もちろん、これにはサーバーマシンにログを収めるのに十分なディスク容量がある、という別の要件も発生します。言い換えれば、このシナリオではローカルシステムが一時的なバッファになるということです。

KafkaとLogstashについての設計時の検討事項

以下に、KafkaとLogstashを併用する場合の設計時の検討事項を挙げます。Logstashのインプットには、高レベルなKafkaのConsumer APIを使用し、Logstashのアウトプットには、新しいProducer APIを使用します。

Topics

Topicは、メッセージを論理的にグループ化することです。Topicにより、必要に応じてデータを他のConsumerから分離することができます。Kafkaのバージョン0.8では、セキュリティ手段が講じられていないため、ブローカーで利用可能なTopicから任意のConsumerがアクセス可能である点にご注意ください。必要になるTopicの数やデータをどのようにモデル化するかは、データ次第です。以下にいくつかの戦略を紹介します。

ユーザーベースのデータフロー:このケースでは、ユーザーごとにTopicを作成します。KafkaはすべてのパーティションをZooKeeperに登録するので、数百、数千のTopicを作成するのにはコストがかかる点にご留意ください。ユーザーの数が少ない場合(たとえば部門数など)には、このユーザーごとにパーティションを切る戦略が有効です。

HOV lane属性ベースのデータフロー: ログデータやイベント駆動データについては、データ量や予想される検索レイテンシなどの属性に基づく1つのTopicに複数のユーザーをグループ化することもできます。イベントがElasticsearchにインデックスされずにキューにある状態で時間が経過すればするほど、そのようなイベントを検索する場合のレイテンシが大きくなる点にご留意ください。ソリューションの1つが、予想されるSLAに基づき、「高」、「中」、「低」のTopicを作成することです。データ量についても同様です。大量のデータを生み出す顧客/ユーザーには、新しいTopicを与えます。マルチテナント型の展開では、「急増」Topicを用意することをお勧めします。これにより、データ量を超えたユーザーや過去○○分または○○時間に大量すぎるデータを生み出したユーザーを、実行中にこのTopicに強制的に移動させることができます。こうすることで、ほかのTopicに不要なトラフィックの影響を与えずに済み、全員の速度が低下することを避けることができます。高速道路にたとえるとわかりやすいでしょう。高速レーンが淀みなく流れるように、動きの遅い車両は別のレーンに移動することが期待されます。KafkaのTopicをレーン、イベントを車両だと考えてみてください!

一般的に、データソースごとにTopicを分けることで、ソースを分離することができます。Kafkaでは、Topicごとにパーティションの数を設定できます。つまり、TopicごとにLogstashのインスタンスをスケール調整できるわけです。特定のソースについて、将来的に多くの容量が必要になると予測される場合、いつでも将来に備えて多めにパーティションを切ることができます。

Topicは、データが初めて存在しないTopicにパブリッシュされたときにその場で作成することも、手動で事前作成することも可能です。

kafka-topics.sh --zookeeper zk_host:port --create --topic user1
    --partitions 8 --replication-factor 1<span></span>

パーティション

そろそろパーティションの話をすべきですね! Kafkaのドキュメントには次のような記載があります。

“「パーティションカウントは、Topicをいくつのログで共有するかを制御します。パーティションカウントの効果はいくつかあります。まず、各パーティションは完全に1つのサーバー上に収まる必要があります。したがって、パーティションが20個あれば、フルデータセット(および読み書きの負荷)を扱えるサーバーは20個以下となります(複製はカウントしない)。つまり、パーティションカウントがConsumerの最大並列処理数に影響します」”

本質的に、パーティション数が増えれば増えるほど、データ消費時のスループットが上がります。Producerの立場から、Kafkaはどのデータをどのパーティションに置くかを制御するオプションを提供しています。デフォルトでは、Logstashを使用すると、データは総当たり方式でパーティションに割り当てられます。Logstashでmessage_keyを指定することで、どのようにデータをパーティションに割り当てるかを制御できます。ZooKeeperの制限を回避するためにTopic/パーティションの数を減らす方が効率的な場合もありますが、user_idmessage_keyとして使用することで、複数のユーザーを固定パーティションでグループ化できます。キーが指定されている場合、パーティションは、キーのハッシュを使用して選択されます。

Kafkaで認識しておくべきもう1つの重要な特性は、メッセージの順番です。Kafkaが保証するのは、同じパーティション内のメッセージの順番に限られます。データソースからのメッセージにキーがない場合、複数のパーティションに分散され、Kafkaは消費時の順番を保証しません。データが不変である場合、特にログ分析ユースケースについては、これは許容可能な特性です。順番の確実性が求められる場合は、必ずデータを単一パーティションに置くようにしてください。

Consumerグループ:拡張性と障害許容性

同様のTopicからのデータを処理する複数のKafka Consumerは、クラスタ内の固有名によって指定されたConsumerグループを形成します。Kafkaにパブリッシュされたメッセージは、グループ内のインスタンス全体に分散されますが、各メッセージはグループ内の1つのConsumerのみによって(つまり重複なく)扱われます。Kafkaから読み取っているLogstashインスタンスは、logstashと呼ばれるデフォルトグループIDを持つConsumerグループを形成します。いつでも新しいLogstashインスタンスをスピンアップして、講読しているTopicの読み込みスループットを拡張できます。デフォルトでは、開始された新しいLogstashインスタンスは、logstashConsumerグループに加わります。新しいConsumerがConsumerグループに加わるというこのプロセスは、Kafkaのリバランスをトリガします。以前にも言及した通り、Logstashは高レベルなKafkaConsumerを使用しているため、Kafkaのライブラリにリバランスのロジックをデリゲートします。このプロセスは、ZooKeeperで利用可能なメタデータに基づいて、自動的に現在のConsumerにパーティションを再割り当てします。複数のLogstashインスタンスを使用するもう1つの理由は、障害許容性を高めることです。1つのインスタンスがダウンした場合、Kafkaはリバランス処理を継続し、割り当てを既存のLogstashインスタンスに分散します。

このすべてが、Logstashインプットのconsumer_threads設定に密接に関連しています。この設定は、Kafkaのパーティションから消費するスレッドの数を制御します。完璧なバランスが得られるように、パーティションと同じ数のスレッドを持つことが理想的です。スレッドの数がパーティションの数を上回ると、一部のスレッドですることがなくなります。スレッドの数がパーティションの数を下回ると、一部のスレッドは複数のパーティションから消費することになります。

16個のパーティションを持つapache_logsのシナリオを考えてみましょう。この設定では、8台のコアマシンで1つのLogstashインスタンスをスピンアップ可能です。

input {
   kafka {
   zk_connect => "kafka:2181"
   group_id => "logstash"
   topic_id => "apache_logs"
   consumer_threads => 16
  }
}

あるいは、2台のマシンで2つのLogstashインスタンスをピンアップして、consumer_threadsをそれぞれ8個に設定することもできます。2つ目の展開の方が良い選択です。マシンのCPUを十分に活用しながら、破局的な故障に備えた障害許容性を追加します。

通常、パーティションの数はLogstashのスレッド/インスタンスの数の倍数となるようにします。これにより、確実にインスタンスのバランスが取れます。Elasticsearchのシャーディング同様、パーティショニングにより後から処理能力を追加できます。

シリアル化のフォーマット

Kafkaは、キューにあるバイト配列を使用してメッセージを保持します。したがって、Kafkaには、ほぼどんなフォーマットでも渡せますが、一般的にはコンパクトで高速なシリアル化フォーマットの使用が推奨されます。Kafkaには、アウトプットにvalue_serializerを、インプットにdecoder_class指定することでシリアル化されたメッセージフォーマットを扱うことができます。Logstashの扱いに慣れたユーザーであれば、おそらくコーデックが使えるのではないかと考えているでしょう。KafkaのTopicのメッセージ表示を管理するには、Logstashのコーデックを活用することも、Kafkaのシリアライザーを使用することも可能です。

ほかにKafkaの環境に適しているLogstashコーデックは、plainavroavro_schema_registryです。

自分でシリアライザー/デシリアライザーを書きたい場合は、お好きなJVM言語で書くことができます。このようなクラスはLogstashのクラスパスにはないため、明示的に適切なライブラリをJAVAのクラスパスに追加する必要があります

export CLASSPATH=$CLASSPATH:/path/to/kafkaserializers.jar; bin/logstash -f ..

Logstashのインプットでのメッセージのデコードはシングルスレッドで行われるため、JSONのようなコストの高いシリアライズ化フォーマットでは、パイプライン全体のパフォーマンスを下げることになる点にご留意ください。

まとめ

この記事では、Kafkaの基本的なコンセプトをいくつか取り上げ、Elastic Stackでの使用方法を紹介しました。次回の記事では、運用面に話を移し、KafkaとLogstashを併用する場合のヒントを紹介します。なお、質問がある場合はご遠慮なくフォーラムまたはtwitterまでお寄せください!

アップデート: もっと詳しく知りたくなりましたか?シリーズ2回目では、運用と本番環境への展開のちょっとしたコツについてお話しします。