エンジニアリング

Logstashを使ってデータを区分し、複数のアウトプットに送る

Logstashは、オープンソースのサーバーサイドデータ処理パイプラインです。Logstashでデータを投入、変換し、1つ以上のアウトプット先に送ることができます。本ブログ記事ではLogstashを使って複数の株式市場のデータを投入し、各市場に対応する個別のアウトプットに送る方法をご紹介します。実行する手順は次の通りです:

  1. 株式市場のインプットストリームから来るドキュメントのコピーを作成する。
  2. 各コピーをフィルタリングし、該当の市場で有効なフィールドのみを持たせる。
  3. 各コピーにメタデータを追加し、どの株式市場のデータが入っているかわかるようにする。
  4. 各ドキュメントのメタデータから判断し、ドキュメントを適切なアウトプットに送る

留意点として、本ブログ記事が説明する機能の一部をアーカイブすると考えられるパイプラインツーパイプライン通信(6.5でベータリリース)は、本記事で紹介する手順では使用していません。

インプットファイル例

Logstashにインプットするファイルには、株式市場でベンチーマークとなる指数を含むCSVファイルを使用します。CSVエントリの一例として、以下のようなものがあります:

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

コンマで区切られた値は、"time"(時刻)と、"DAX"、"SMI"、"CAC"、"FTSE"の各市場の指数となる値を表します。上の4行を"stocks.csv"と呼ばれるCSVファイルにコピー&ペーストすると、このLogstashパイプライン例でインプットとして使うことができます。

Logstashパイプライン例

本セクションで後ほど示す記述は、次の動作を実行するLogstashパイプラインです。

  1. CSVファイルから、CSV形式のインプットとして株式市場の値を読み込む。
  2. CSVインプットの各行をJSONドキュメントにマップする(CSV列を"time"、"DAX"、"SMI"、"CAC"、"FTSE"の各JSONフィールドにマップする)。
  3. timeフィールドをUnixフォーマットに変換する。
  4. clone filterプラグインを使用し、(元のドキュメントに加えて)各ドキュメントにつき2つコピーを作成する。"clone filter"は各ドキュメントのコピーに新規の"type"フィールドを自動で追加する。"type"は所与のクローンの配列名に対応する。(今回はtypeが"clone_for_SMI"や"clone_for_FTSE"となるよう定義し、各クローンが最終的に"SMI"または"FTSE"のいずれかの市場から来るデータだけを持つように設定しています。)
  5. 各クローンに対して次の動作を実行する:
    1. prune filterプラグインを使用して、特定の市場向けにホワイトリスト化されたフィールドを除くすべてのフィールドを削除する。
    2. 各ドキュメントに、クローン関数が追加した"type"に対応するメタデータを追加する。(prune関数を使ってclone関数が挿入した"type"を削除していますが、"type"情報は正しいアウトプットにドキュメントを送る際に必要となることから、この手順が必要となります。)
  6. Logstash用のElasticsearchアウトプットプラグインを使用して、異なるElasticsearchアウトプットに各市場のドキュメントを書き出す。アウトプットの定義は、手順5で追加したメタデータフィールドの値に基づく。(コードをシンプル化するため、各ElasticsearchアウトプットはローカルのElasticsearchクラスター内で一意のインデックスに書き込む設定となっています。複数のクラスターをアウトプットとして使用する必要がある場合、各Elasticsearchアウトプットの宣言は、一意のElasticsearchホストを指定して簡単に修正することができます。)

次の記述が、上の手順を実行するLogstashパイプラインです(対応する手順の番号が、コメント形式で挿入されています)。 このパイプラインを実行するには、"clones.conf"というファイルに次の記述をコピーします。

## 手順1
input {
  file {
    # stocks.csvファイルを確実に使用できるよう、パスを編集してください
    path => "${HOME}/stocks.csv"
    # 以下でLogstashの実行のたびに完全なインプットの 
    # 再読み込みを確実に実行します(デバッグに役立ちます)
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
## 手順2
filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 
      'DAX' => 'float'
      'SMI' => 'float'
      'CAC' => 'float'
      'FTSE' => 'float'
    }
  }
## 手順3  
date {
    match => ['time', 'UNIX']
  }
## 手順4
  # 以下の行は各ドキュメントにつき2つずつ 
  # コピーを作成します(オリジナルを含め合計3つ)。 
  # 各コピーには自動的に配列の
  # 名前に対応する"type"が 
  # 追加されます
  clone {
    clones => ['clone_for_SMI', 'clone_for_FTSE']
  }
## 手順5
  if [type] == 'clone_for_SMI' {
    # "SMI"以外のすべてを削除します
    prune {
       whitelist_names => [ "SMI"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_SMI" } 
    }
  } 
  else if [type] == 'clone_for_FTSE' {
    prune {
       whitelist_names => [ "FTSE"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_FTSE" } 
    }
  } 
}
## 手順6
output {
  # 以下のstdoutへのアウトプットは 
  # デバッグ目的であり、削除できます
  stdout { 
    codec =>  rubydebug {
      metadata => true
    }
  }
  if [@metadata][type] == 'only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Logstashパイプラインをテストする

事例のCSVデータを投入するパイプラインをテストするには、次のコマンドを実行します。その際、必ずパスを修正し、お使いのシステムで実際に使用した正しいパスとなるようにしてください。留意点として、"config.reload.automatic"の指定はオプションです。このオプションには、Logstashを再起動せずに"clones.conf"を自動でリロードできるというメリットがあります。

./logstash -f ./clones.conf --config.reload.automatic

Logstashが"stocks.csv"ファイルを読み込んで処理を完了すると、"smi_data"、"ftse_data"、"stocks_original"の3つのインデックスを確認することができます。

SMIインデックスを確認する

GET /smi_data/_search

このコマンドで、ドキュメントが次の構造で表示されます。"smi_data"インデックスで、"SMI"市場のデータのみが表示されることに注目しましょう。

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score":1,
        "_source": {
          "SMI":1688.5    
        }
      }

FTSEインデックスを確認する

GET /ftse_data/_search

このコマンドで、ドキュメントが次の構造で表示されます。"ftse_data"インデックスのドキュメントで、"FTSE"フィールドのみが表示されることに注目しましょう。

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id":"AgRskWUBsYalOV9y9hL0",
        "_score":1,
        "_source": {
          "FTSE":2448.2
        }
      }

オリジナルドキュメントインデックスを確認する

GET /stocks_originals/_search

このコマンドで、ドキュメントが次の構造で表示されます。"stocks_original"インデックスで、元のドキュメントがフィルタリングされていないことがわかります。

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score":1,
        "_source": {
          "host":"Alexanders-MBP",
          "@timestamp":"2017-01-01T00:30:00.000Z",
          "SMI":1678.1,
          "@version":"1",
          "message":"1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC":1772.8,
          "DAX":1628.75,
          "time":"1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE":2443.6
        }
      }

まとめ

本ブログ記事では、Logstashの機能の一部をご紹介しました。具体的には、複数の株式市場のデータをLogstashに投入し、処理を行った後にデータを個別のアウトプットに送る使い方を説明しています。LogstashやElastic Stackの試用に関して質問がおありの場合は、公開ディスカッションフォーラムを活用されることをおすすめしています。