Tech Topics

「メールけいしちょう」を Elastic Stack で可視化する

メールけいしちょうは、警視庁が提供している犯罪発生情報や防犯情報をメールで配信するサービスです。
配信される内容は一部を除き、 CC BY 2.1 JP ライセンスに基づき、使用することができます。

以下のようなメッセージがメールで配信されます。

Subject: 玉川警察署(子ども(暴行))
Body:
4月16日(土)、午後4時40分ころ、世田谷区奥沢1丁目の路上で、児童が通行中、男に突き飛ばされました。(犯人(男)の特徴については、50歳代、170cm 位、中肉、口ひげ、茶色っぽい上衣、黒色っぽいズボン)
【問合せ先】玉川警察署 03-3705-0110(内線2612)

本データを Logstash を使って Elasticsearch でインデックスし、Kibana で可視化する方法を紹介します。バージョン 5.0.0-alpha1 を前提に記載しますが、Elasticsearch の mapping を適切にアレンジすれば、以前のバージョンでも適用可能です。

データの取得

あらかじめ「メールけいしちょう」の登録を行い、Gmail で受信することにします。Logstash で取得するために、IMAP を有効 にします。二段階認証プロセスを有効にしている場合には、アプリパスワード を取得しておきます。また、取得したメールは予め仕分けておくと便利です。今回は Police フォルダに仕分けます。

Gmail のユーザ名を _IMAP_USER_、パスワードを _IMAP_PASSWORD_ とすると,Logstash の input の設定は以下のようになります。

input {
  imap {
    host => "imap.gmail.com"
    port => 993
    user => "_IMAP_USER_"
    password => "_IMAP_PASSWORD_"
    folder => "Police"
    type => "police"
    check_interval => 300
    codec => plain { charset => "ISO-2022-JP" }
  }
}

本メールは “content-type: text/plain; charset=ISO-2022-JP” で配信されているので、plain codec plugin で指定します。

imap input plugin は、コミュニティプラグインですので、承知の上、ご利用ください。

データの構造化

配信されるメッセージは、機械的な処理をそれほど考慮していないようですが、決まった書式がありそうですので、構造化してから、Elasticsearch でインデックスします。メール本文については、以下のように grok filter でパースし、事件の発生日時、場所などを取得します。

grok {
  match => { "message" => "%{DATA:[@metadata][datetime]}ころ、%{NOTSPACE:city}(区|市)%{NOTSPACE:area}(の|付近)(%{GREEDYDATA:place}|)で、%{GREEDYDATA}" }
}

また、件名から担当警察署、事象の種類を取得します。

grok {
  match => { "subject" => "%{NOTSPACE:police_station}警察署\(%{NOTSPACE:incident}\)" }
}

事象発生日時を本文より取得し、Logstash のタイムスタンプとして採用します。最終的な filter 部分の設定は以下の通りです。

filter {
  grok {
    match => { "message" => "%{DATA:[@metadata][datetime]}ころ、%{NOTSPACE:city}(区|市)%{NOTSPACE:area}(の|付近)(%{NOTSPACE:place}|)で、%{GREEDYDATA}" }
  }
  date {
    match => ["[@metadata][datetime]", "M月d日(E)、aK時m分"]
    locale => ja
    timezone => "Asia/Tokyo"
  }
  grok {
    match => { "subject" => "%{NOTSPACE:police_station}警察署\(%{NOTSPACE:incident}\)" }
  }
}

本書式にあてはまらないメッセージも配信されるようですので、それぞれに適した grok filter を作成してください。

Elasticsearch への出力

elasticsearch output plugin を使用して、Elasticsearch にドキュメントとして登録します。インデックス名に日付を使用する場合には、インデックスのサイズが適切になるように検討してください。以下の場合には、月次で新しいインデックスが作成されます。

output {
  stdout { codec => dots }
  elasticsearch {
    hosts => ["http://localhost:9200/"]
      index => "police-mail-%{+YYYY.MM}"
  }
}

Mapping

メールの件名、本文、構造化したフィールドには日本語が含まれるため、analyzer として kuromoji を使用します。$ bin/elasticsearch-plugin install kuromoji コマンドでインストールすることができます。
以下のテンプレートを登録します。

curl -XPUT localhost:9200/_template/police-mail -d '{
  "template": "police-mail-*",
  "mappings": {
    "police": {
      "properties": {
        "area": {
          "analyzer": "kuromoji",
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword"
            }
          }
        },
        "city": {
          "analyzer": "kuromoji",
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword"
            }
          }
        },
        "subject": {
          "analyzer": "kuromoji",
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword"
            }
          }
        },
        "police_station": {
          "analyzer": "kuromoji",
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword"
            }
          }
        },
        "place": {
          "analyzer": "kuromoji",
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword"
            }
          }
        },
        "message": {
          "analyzer": "kuromoji",
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword"
            }
          }
        },
        "incident": {
          "analyzer": "kuromoji",
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword"
            }
          }
        }
      }
    }
  }
}'

police-mail-* に一致するインデックスには、この mapping が適用されます。textkeyword 型フィールドは、Elasticsearch 5.0 で導入されました。それぞれ strings 型の analyzednot_analyzed フィールドに対応します。

Logstash の実行

input {...}, filter {...}, output {...} を記載した設定ファイルを作成し、$ logstash -f mail_police.conf などとして Logstash を起動します。メールを一件処理するたびにドット . が表示されます。

Kibana での可視化

Kibana の Visualize 画面で、Pie chart を作成します。Split Slices の Aggregation に Terms を、Field に incident.keyword を選択します。本フィールドは not_analyzed フィールドです。メールの件名から取得した事象の件数に基づいた Pie chart が作成できます。

スクリーンショット 2016-04-19 16.13.24.png

analyzed フィールドでも特定の用語が含まれるドキュメントの件数をカウントすることができます。同じく Visualize 画面より Line chart を作成します。インデックスを選んだら、X-Axis に Date Histogram を、Add sub-buckets から Split Lines を押し、Sub Aggregation に Filters を 選択します。カウントしたい用語を Query String Query 形式で入力します。複数の Filter を設定することも可能です。

スクリーンショット 2016-04-19 16.14.55.png


まとめ

Logstash を使用すると、メールで受信したメッセージを Elasticsearch に取り込むことができます。監視システムなどによって作成される通知メッセージであれば、多くは定型化されていますので、grok filter を使用して、容易に構造化することが可能です。また、構造化せずに取り込んでも、Filters Aggregation を使用すると、ある事象が発生した回数などをカウントすることができます。すでに運用しているシステムを Kibana で可視化するなど、ぜひおためしください。