エンジニアリング

時系列データをElasticsearchでクエリ&アグリゲーションする方法

Elasticsearchは、Luceneデータベース内に検索インデックスを保つ検索エンジンとして誕生しました。そのルーツとは対照的に、Elasticsearchは非常にパフォーマンスにすぐれた、クラスター化可能かつスケーラブルなデータストアとして進化を続けてきました。そのインデックス形式には今なお検索エンジンとしてのルーツを反映しているものの、現在はあらゆる目的で、あらゆるタイプのユーザーに使用されています。

Elasticsearchの用途の1つに、時系列データの格納/処理/検索があります。時系列データの特徴は、すべてのデータポイントが正確なタイムスタンプに関連付けられていることです。1つのデータポイントは、多くの場合、“ある特定の瞬間に実施された何らかの類の測定”を意味します。たとえば株価や、科学的観測、あるいはサーバーの負荷といったものが、これに該当します。

時系列データの処理に特化したデータベースの実装はいくつか存在しますが、時系列を格納/クエリする共通の(一般的な)形式というものが存在するわけではありません。論理的には、あらゆるデータベースエンジンで時系列データを扱うことが可能です。

抽象的な形式で表すと、時系列データベースのエントリは次の要素で構成されます。

  • 時系列名
  • 1つのタイムスタンプ
  • 1つの値
  • (系列についてより詳しい情報を含む)一連のkey-valueペア、あるいはタグ

サーバー監視のユースケースではかならず、時系列が属するホストを指定する1つのkey-valueペアが存在します。しかし、追加する付加的な情報は何であってもよく、後で特定の一連のホストのメトリックをリクエストするだけの目的に使用することもできます。一連のホストとは、たとえば運用環境に使われる、特定のサービスを実行するホスト群や、特定のクラウドプロバイダー上で実行されているインスタンスなどです。

より実践的に説明するため、Metricbeatデータを例に、Elasticsearchクエリを使用してデータから特定の時系列情報を選別する方法を示してみます。

各Metricbeatドキュメントは次の情報を含みます。

  • 1つのタイムスタンプ
  • 実際の時系列データ
    • Metricbeatについて、この部分のドキュメントの形式はMetricbeatのドキュメントに記載されています。systemモジュールのcpuメトリックセットについて詳しくは、 メトリックセットドキュメントをご参照ください。
  • ドキュメントに含まれる、メトリック自体に関するメタデータ(MetricbeatはECSフィールドのevent.moduleevent.datasetを使用してドキュメントを作成するMetricbeatモジュールおよび、ドキュメントに含めるメトリックセットを指定します。)
    • これは、時系列データの取得に先立ち、はじめにどのメトリックが存在するか確認する上で役立ちます。
  • インスタンスに関するメタデータ(物理ホストか、仮想マシンか、Kubernetes podやDockerコンテナーのようなより小さいエンティティか)
    • このメタデータはElastic Common Schemaに従い、ECSを使用する他のソースからくるデータとマッチさせることができます。

たとえば、system.cpuメトリックセットからくるMetricbeatはこのようになります。_sourceオブジェクトのインラインコメントは、そのフィールドについてより詳しい情報があることを示しています。

注:読みやすくするため、ドキュメントではJSON中に#を付したコメントを含んでいます。

{ 
  "_index" : "metricbeat-8.0.0-2019.08.12-000001",
  "_type" : "_doc",
  "_id" : "vWm5hWwB6vlM0zxdF3Q5",
  "_score" :0.0,
  "_source" : { 
    "@timestamp" :"2019-08-12T12:06:34.572Z",
    "ecs" : { # ECS metadata 
      "version" :"1.0.1" 
    },
    "host" : { # ECS metadata 
      "name" : "noether",
      "hostname" : "noether",
      "architecture" : "x86_64",
      "os" : { 
        "kernel" :"4.15.0-55-generic",
        "codename" : "bionic",
        "platform" : "ubuntu",
        "version" :"18.04.3 LTS (Bionic Beaver)",
        "family" : "debian",
        "name" :"Ubuntu" 
      },
      "id" :"4e3eb308e7f24789b4ee0b6b873e5414",
      "containerized" : false 
    },
    "agent" : { # ECS metadata 
      "ephemeral_id" :"7c725f8a-ac03-4f2d-a40c-3695a3591699",
      "hostname" : "noether",
      "id" : "e8839acc-7f5e-40be-a3ab-1cc891bcb3ce",
      "version" :"8.0.0",
      "type" : "metricbeat" 
    },
    "event" : { # ECS metadata 
      "dataset" : "system.cpu",
      "module" : "system",
      "duration" :725494 
    },
    "metricset" : { # metricbeat metadata 
      "name" : "cpu" 
    },
    "service" : { # metricbeat metadata 
      "type" : "system" 
    },
    "system" : { # metricbeat time series data  
      "cpu" : { 
        "softirq" : { 
          "pct" :0.0112 
        },
        "steal" : { 
          "pct" :0 
        },
        "cores" :8,
        "irq" : { 
          "pct" :0 
        },
        "idle" : { 
          "pct" :6.9141 
        },
        "nice" : { 
          "pct" :0 
        },
        "user" : { 
          "pct" :0.7672 
        },
        "system" : { 
          "pct" :0.3024 
        },
        "iowait" : { 
          "pct" :0.0051 
        },
        "total" : { 
          "pct" :1.0808 
        } 
      } 
    } 
  } 
}

まとめると、Metricbeatドキュメント内で時系列データとメタデータは混ざっており、求めるデータを入手するには、ドキュメント形式に関する知識が必要です。

逆に時系列データを処理/分析/可視化したい場合、典型的には、データは以下のように表のような形式となっています。

<series name> <timestamp> <value> <key-value pairs> 
system.cpu.user.pct 1565610800000 0.843 host.name=”noether” 
system.cpu.user.pct 1565610800000 0.951 host.name=”hilbert” 
system.cpu.user.pct 1565610810000 0.865 host.name=”noether” 
system.cpu.user.pct 1565610810000 0.793 host.name=”hilbert” 
system.cpu.user.pct 1565610820000 0.802 host.name=”noether” 
system.cpu.user.pct 1565610820000 0.679 host.name=”hilbert”

Elasticsearchクエリは、プログラミングによって時系列データを表に近い形式で取得する上で役立ちます。以下は、その手法の例です。ご自身でクエリをテストするには、Elasticsearchインスタンスと、system.cpuおよびsystem.networkメトリックセット向けにデータをシッピングする実行中のMetricbeatインストールが必要です。Metricbeatの概要をコンパクトにまとめた入門ガイドドキュメントもありますので、ご覧ください。

すべてのクエリは、KibanaのDev Tools(開発ツール)コンソールから実行することができます。使用経験がない方は、Kibanaコンソールのドキュメントで簡単な概要を確認することができます。事例のクエリのホスト名は変更する必要がありますので、ご注意ください。

ここでは、Metricbeatがデフォルトのまま設定されていると仮定します。その場合、1日に1つのインデックスが作成され、インデックス名は‘metricbeat-バージョン-日付-カウンター’となります。たとえばmetricbeat-7.3.0-2019.08.06-000009のようになります。これらのインデックスを一度にクエリするには、ワイルドカードを使用します。

クエリ例:

GET metricbeat-*/_search

レスポンスは次のようになります。

{ 
  "took" :2,
  "timed_out" : false,
  "_shards" : { 
    "total" :1,
    "successful" :1,
    "skipped" :0,
    "failed" :0 
  },
  "hits" : { 
    "total" : { 
      "value" :10000,
      "relation" : "gte" 
    },
    "max_score" :1.0,
    "hits" : [...] 
  } 
}

もちろん、このクエリが対象とするドキュメントは、Elasticsearchが1つのクエリで返せるドキュメントの上限を超えています。ここでは実際のヒット数は省略されていますが、結果をスクロールして、上で注釈がついているドキュメントと比較することが可能です。

監視するインフラのサイズによって、膨大な数のMetricbeatドキュメントが存在することもあります。しかし、(記録を)開始した最初の時点からの時系列を対象としてクエリすることはまれです。ここでは、日付範囲を直近5分にして実行してみましょう。

クエリ例:

GET metricbeat-*/_search 
{ 
  "query": { 
    "range": { 
      "@timestamp": { 
        "gte": "now-5m" 
      } 
    } 
  } 
}

レスポンスは次のようになります。

{ 
  "took" :4,
  "timed_out" : false,
  "_shards" : { 
    "total" :1,
    "successful" :1,
    "skipped" :0,
    "failed" :0 
  },
  "hits" : { 
    "total" : { 
      "value" :30,
      "relation" : "eq" 
    },
    "max_score" :0.0,
    "hits" : [...] 
  } 
}

ずっと管理しやすいサイズになりました。しかし、このクエリを実行するシステムには、レポートするホストが1つしかありません。したがって、運用環境でのヒット数は依然として大きな値となってしまいます。

特定のホストのすべてのCPUデータを取得する場合、Elasticsearchクエリで最もネイティブな方法は、ホスト名とメトリックセットsystem.cpuにフィルターを追加するやり方です。

クエリ例:

GET metricbeat-*/_search 
{ 
  "query": { 
    "bool": { 
      "filter": [ 
        { 
          "range": { 
            "@timestamp": { 
              "gte": "now-5m" 
            } 
          } 
        },
        { 
          "bool": { 
            "should": [ 
              { 
                "match_phrase": { 
                  "host.name": "noether" 
                } 
              },
              { 
                "match_phrase": { 
                  "event.dataset": "system.cpu" 
                } 
              } 
            ] 
          } 
        } 
      ] 
    } 
  } 
}

レスポンスは次のようになります。

{ 
  "took" :8,
  "timed_out" : false,
  "_shards" : { 
    "total" :1,
    "successful" :1,
    "skipped" :0,
    "failed" :0 
  },
  "hits" : { 
    "total" : { 
      "value" :30,
      "relation" : "eq" 
    },
    "max_score" :0.0,
    "hits" : [...] 
  } 
}

クエリは依然として大量のドキュメントを返していますが、そのすべてにMetricbeatによってsystem.cpuメトリックセット向けに送信された完全なデータが含まれています。いくつかの理由から、これは“非常に役立つ結果”であるとは言えません。

第一に、時間範囲全体について、すべてのドキュメントを取得する必要が生じることがあります。設定された上限に達すると、Elasticsearchは一発で返してくれなくなります。順位付けを試みますが、このクエリでは意味を成しません。タイムスタンプでソートされたドキュメントを返すことはなくなります。

第二に、求めていたのは各ドキュメントのごく一部分だけです。タイムスタンプといくつかのメトリック値、可能なら追加のメタデータフィールドをいくつか、求めていました。Elasticsearchから_source全体が返され、そのクエリ結果からデータを選び出す作業では、非常に効率が悪くなります。

そしてこの問題を解消するアプローチの1つが、Elasticsearchのアグリゲーションの使用です。

活用事例1:ダウンサンプリングされたCPUパーセンテージ

はじめに日付ヒストグラムを見てみましょう。日付ヒストグラムアグリゲーションを使うと、タイムインターバル(時間間隔)につき1つの値が返されます。返されたバケットは時間順に並べられ、データに応じてインターバル(間隔)やバケットのサイズを指定することができます。この事例では、Metricbeatがデフォルトでシステムモジュールから10秒おきにデータを送信しているため、インターバルサイズは10秒に選択されています。上位にあるsize:0パラメーターは、実際にヒットしたドキュメントは必要なく、アグリゲーションだけが必要であり、ドキュメントは返されないという振る舞いを指定しています。

クエリ例:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size":0,
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp",
        "fixed_interval":"10s" 
      } 
    } 
  } 
}

レスポンスは次のようになります。

{ 
  ...,
  "hits" : { 
    "total" : { 
      "value" :30,
      "relation" : "eq" 
    },
    "max_score" : null,
    "hits" : [ ] 
  },
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" :"2019-08-12T13:03:20.000Z",
          "key" :1565615000000,
          "doc_count" :1 
        },
        { 
          "key_as_string" :"2019-08-12T13:03:30.000Z",
          "key" :1565615010000,
          "doc_count" :1 
        },
        { 
          "key_as_string" :"2019-08-12T13:03:40.000Z",
          "key" :1565615020000,
          "doc_count" :1 
        },
        ...
      ] 
    } 
  } 
}

各バケットにつき、keyのタイムスタンプと、key_as_string(人間に読める日付時間ストリングを含むので役立ちます)、バケットに残されたドキュメントの数が返されます。

doc_countが1となっているのは、バケットサイズがMetricbeatのレポート周期と一致しているからです。それ以外に特に役立つことはないため、実際のメトリック値を表示するために別のアグリゲーションを追加します。このステップで、どのようなアグリゲーションにするか決定します。数値で候補となるのは、avgminmaxなどです。しかし、バケットにつき1つのドキュメントしかない場合、どれを選択するかはあまり重要ではありません。そのことを示したのが次の例です。10秒のバケット内のsystem.cpu.user.pctメトリックについて、avgminmaxのアグリゲーションを返しています。

クエリ例:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size":0,
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp",
        "fixed_interval":"10s" 
      },
      "aggregations": { 
        "myActualCpuUserMax": { 
          "max": { 
            "field": "system.cpu.user.pct" 
          } 
        },
        "myActualCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        },
        "myActualCpuUserMin": { 
          "min": { 
            "field": "system.cpu.user.pct" 
          } 
        } 
      } 
    } 
  } 
}

レスポンスは次のようになります。

{ 
  ...,
  "hits" : {...},
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" :"2019-08-12T13:12:40.000Z",
          "key" :1565615560000,
          "doc_count" :1,
          "myActualCpuUserMin" : { 
            "value" :1.002 
          },
          "myActualCpuUserAvg" : { 
            "value" :1.002 
          },
          "myActualCpuUserMax" : { 
            "value" :1.002 
          } 
        },
        { 
          "key_as_string" :"2019-08-12T13:12:50.000Z",
          "key" :1565615570000,
          "doc_count" :1,
          "myActualCpuUserMin" : { 
            "value" :0.866 
          },
          "myActualCpuUserAvg" : { 
            "value" :0.866 
          },
          "myActualCpuUserMax" : { 
            "value" :0.866 
          } 
        },
        ...
      ] 
    } 
  } 
}

上の通り、myActualCpuUserMinmyActualCpuUserAvgmyActualCpuUserMaxの値は各バケットで同じです。より短い間隔でレポートされる時系列の生の値を取得する必要がある場合は、日付ヒストグラムを使ってこれを行うことができます。

しかし大抵の場合、すべてのデータポイントが必要となることはありません。特に、数秒おきに測定していればなおさらです。実際、ほとんどの目的に対しては、データをより粗い粒度にする方が適切です。たとえば、時系列の変化を表示する可視化で、画素数が限られたものである場合、高粒度のデータはレンダリング時に無駄になってしまいます。

時系列データの粒度はしばしば、次のステップの処理要件に合わせてダウンサンプリングされます。ダウンサンプリングを行うと、一定の時間周期内にある複数のデータポイントが減り、1つになります。サーバー監視の例では、データが10秒ごとに測定されていました。しかし大抵の目的に対しては、1分間に含まれるすべての値の平均で十分です。ダウンサンプリングは偶然にも、日付ヒストグラムアグリゲーションがバケットにつき1つ以上のドキュメントを見つけたときに行ったこととまったく同一であり、適切にネストされたアグリゲーションが使用されています。

以下は、1分間のバケットに対するネストされたavgminmaxアグリゲーション結果を示しています。ダウンサンプリングの1つ目の事例としてご紹介しましょう。fixed_intervalの代わりにcalendar_intervalを使用し、バケットの境界を1分に合わせています。

クエリ例:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size":0,
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp",
        "calendar_interval":"1m" 
      },
      "aggregations": { 
        "myDownsampledCpuUserMax": { 
          "max": { 
            "field": "system.cpu.user.pct" 
          } 
        },
        "myDownsampledCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        },
        "myDownsampledCpuUserMin": { 
          "min": { 
            "field": "system.cpu.user.pct" 
          } 
        } 
      } 
    } 
  } 
}

レスポンスは次のようになります。

{ 
  ...,
  "hits" : {...},
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" :"2019-08-12T13:27:00.000Z",
          "key" :1565616420000,
          "doc_count" :4,
          "myDownsampledCpuUserMax" : { 
            "value" :0.927 
          },
          "myDownsampledCpuUserMin" : { 
            "value" :0.6980000000000001 
          },
          "myDownsampledCpuUserAvg" : { 
            "value" :0.8512500000000001 
          } 
        },
        { 
          "key_as_string" :"2019-08-12T13:28:00.000Z",
          "key" :1565616480000,
          "doc_count" :6,
          "myDownsampledCpuUserMax" : { 
            "value" :0.838 
          },
          "myDownsampledCpuUserMin" : { 
            "value" :0.5670000000000001 
          },
          "myDownsampledCpuUserAvg" : { 
            "value" :0.7040000000000001 
          } 
        },
        ...
      ] 
    } 
  } 
}

上の通り、これでmyActualCpuUserMinmyActualCpuUserAvgmyActualCpuUserMaxの各値が使用するアグリゲーションに応じ、異なる値となりました。

ダウンサンプリングに使用するメソッドは、メトリックによって異なります。CPUのパーセンテージであれば1分間のavgアグリゲーションが適しています。しかし、キューの長さや、システム負荷といったメトリックに対しては、maxアグリゲーションの方がより適切と考えられます。

ここからさらに、Elasticsearchを使って簡単な算数を行い、元のデータには含まれていない時系列を計算することもできます。CPUにavgアグリゲーションを行っているとしましょう。事例を拡張し、ユーザーCPU、システムCPU、システムとユーザーの合計をCPUコアで割った数を返すことにします。

クエリ例:

GET metricbeat-*/_search 
{ 
  "query": {...},   # same as above 
  "size":0,
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp",
        "calendar_interval":"1m" 
      },
      "aggregations": { 
        "myDownsampledCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        },
        "myDownsampledCpuSystemAvg": { 
          "avg": { 
            "field": "system.cpu.system.pct" 
          } 
        },
        "myCpuCoresMax": { 
          "max": { 
            "field": "system.cpu.cores" 
          } 
        },
        "myCalculatedCpu": { 
          "bucket_script": { 
            "buckets_path": { 
              "user": "myDownsampledCpuUserAvg",
              "system": "myDownsampledCpuSystemAvg",
              "cores": "myCpuCoresMax" 
            },
            "script": { 
              "source": "(params.user + params.system) / params.cores",
              "lang": "painless" 
            } 
          } 
        } 
      } 
    } 
  } 
}

レスポンスは次のようになります。

{ 
  ...,
  "hits" : {...},
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" :"2019-08-12T13:32:00.000Z",
          "key" :1565616720000,
          "doc_count" :2,
          "myDownsampledCpuSystemAvg" : { 
            "value" :0.344 
          },
          "myCpuCoresMax" : { 
            "value" :8.0 
          },
          "myDownsampledCpuUserAvg" : { 
            "value" :0.8860000000000001 
          },
          "myCalculatedCpu" : { 
            "value" :0.15375 
          } 
        },
        { 
          "key_as_string" :"2019-08-12T13:33:00.000Z",
          "key" :1565616780000,
          "doc_count" :6,
          "myDownsampledCpuSystemAvg" : { 
            "value" :0.33416666666666667 
          },
          "myCpuCoresMax" : { 
            "value" :8.0 
          },
          "myDownsampledCpuUserAvg" : { 
            "value" :0.8895 
          },
          "myCalculatedCpu" : { 
            "value" :0.15295833333333334 
          } 
        },
        ...
      ] 
    } 
  } 
}

活用事例2:ネットワークトラフィック — terms aggregationとderivative aggregation

Elasticsearchのアグリゲーションが時系列データ役立つことを示すもう少し高度な事例として、system.networkメトリックセットを取り上げてみます。system.networkメトリックセットドキュメントでこの事例に関連するのは、次のような部分です。

{ 
...
"system": { 
        "network": { 
            "in": { 
                "bytes":37904869172,
                "dropped":32,
                "errors":0,
                "packets":32143403 
            },
            "name": "wlp4s0",
            "out": { 
                "bytes":6299331926,
                "dropped":0,
                "errors":0,
                "packets":13362703 
            } 
        } 
    } 
...
}

Metricbeatは、システムに存在するすべてのネットワークインターフェースにつき1ドキュメントを送信します。これらのドキュメントは同じタイムスタンプを持ちますが、ネットワークインターフェースごとのsystem.network.nameフィールドの値が異なります。

この先のアグリゲーションはインターフェースごとに実行する必要があります。そこで先ほどの事例の上位にあった日付ヒストグラムアグリゲーションを、system.network.nameフィールドに対するterms aggregationに変更します。

アグリゲーションされたフィールドを動作させるには、キーワードフィールドとしてマップする必要がある点に注意してください。Metricbeatで提供されるデフォルトのテンプレートで作業している場合は、すでにマッピングで設定されています。デフォルトのテンプレートを使っていない場合は、必要な手順についてMetricbeatテンプレートドキュメントのページに簡単な説明があります。

クエリ例:

GET metricbeat-*/_search 
{ 
  "query": {...}, # same as above 
  "size":0,
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name",
        "size":50 
      },
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp",
            "calendar_interval":"1m" 
          } 
        } 
      } 
    } 
  } 
}

レスポンスは次のようになります。

{ 
  ...,
  "hits" : {...},
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" :0,
      "sum_other_doc_count" :0,
      "buckets" : [ 
        { 
          "key" : "docker0",
          "doc_count" :29,
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        },
        { 
          "key" : "enp0s31f6",
          "doc_count" :29,
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        },
        { 
          "key" : "lo",
          "doc_count" :29,
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        },
        { 
          "key" : "wlp61s0",
          "doc_count" :29,
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" :"2019-08-12T13:39:00.000Z",
                "key" :1565617140000,
                "doc_count" :1 
              },
              { 
                "key_as_string" :"2019-08-12T13:40:00.000Z",
                "key" :1565617200000,
                "doc_count" :6 
              },
              { 
                "key_as_string" :"2019-08-12T13:41:00.000Z",
                "key" :1565617260000,
                "doc_count" :6 
              },
              { 
                "key_as_string" :"2019-08-12T13:42:00.000Z",
                "key" :1565617320000,
                "doc_count" :6 
              },
              { 
                "key_as_string" :"2019-08-12T13:43:00.000Z",
                "key" :1565617380000,
                "doc_count" :6 
              },
              { 
                "key_as_string" :"2019-08-12T13:44:00.000Z",
                "key" :1565617440000,
                "doc_count" :4 
              } 
            ] 
          } 
        },
        ...
      ] 
    } 
  } 
}

CPUの事例からわかるように、ネストされたアグリゲーションなしの日付ヒストグラムアグリゲーションはdoc_countしか返さないため、あまり役立ちません。

バイトのフィールドには、単調に増加する値が入っています。これらのフィールドには、前回マシンの起動以降に送信、または受信したバイト数の値が含まれるので、測定のたびに増加しているのです。このケースでは、適切なネストされたアグリゲーションはmaxです。したがってダウンサンプリングされた値は最高値であり、バケットの間隔内にある最も新しい測定値ということになります。

クエリ例:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size":0,
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name",
        "size":50 
      },
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp",
            "calendar_interval":"1m" 
          },
          "aggregations": { 
            "myNetworkInBytesMax": { 
              "max": { 
                "field": "system.network.in.bytes" 
              } 
            },
            "myNetworkOutBytesMax": { 
              "max": { 
                "field": "system.network.out.bytes" 
              } 
            } 
          } 
        } 
      } 
    } 
  } 
}

レスポンスは次のようになります。

{ 
  ...,
  "hits" : {...},
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" :0,
      "sum_other_doc_count" :0,
      "buckets" : [ 
        { 
          "key" : "docker0",
          ...
        },
        { 
          "key" : "enp0s31f6",
          ...
        },
        { 
          "key" : "lo",
          ...
        },
        { 
          "key" : "wlp61s0",
          "doc_count" :30,
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" :"2019-08-12T13:50:00.000Z",
                "key" :1565617800000,
                "doc_count" :2,
                "myNetworkInBytesMax" : { 
                  "value" :2.991659837E9 
                },
                "myNetworkOutBytesMax" : { 
                  "value" :5.46578365E8 
                } 
              },
              { 
                "key_as_string" :"2019-08-12T13:51:00.000Z",
                "key" :1565617860000,
                "doc_count" :6,
                "myNetworkInBytesMax" : { 
                  "value" :2.992027006E9 
                },
                "myNetworkOutBytesMax" : { 
                  "value" :5.46791988E8 
                },
                "myNetworkInBytesPerSecond" : { 
                  "value" :367169.0,
                  "normalized_value" :6119.483333333334 
                },
                "myNetworkoutBytesPerSecond" : { 
                  "value" :213623.0,
                  "normalized_value" :3560.383333333333 
                } 
              },
              ...
            ] 
          } 
        },
        ...
      ] 
    } 
  } 
}

derivative aggregationを使うと、単調に増加する計数から1秒当たりのバイト増加率を取得することができます。オプションのパラメーターunitがこのアグリゲーションに渡されると、normalized_valueフィールドでユニット当たりの求める値を返します。

クエリ例:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size":0,
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name",
        "size":50 
      },
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp",
            "calendar_interval":"1m" 
          },
          "aggregations": { 
            "myNetworkInBytesMax": { 
              "max": { 
                "field": "system.network.in.bytes" 
              } 
            },
            "myNetworkInBytesPerSecond": { 
              "derivative": { 
                "buckets_path": "myNetworkInBytesMax",
                "unit":"1s" 
              } 
            },
            "myNetworkOutBytesMax": { 
              "max": { 
                "field": "system.network.out.bytes" 
              } 
            },
            "myNetworkoutBytesPerSecond": { 
              "derivative": { 
                "buckets_path": "myNetworkOutBytesMax",
                "unit":"1s" 
              } 
            } 
          } 
        } 
      } 
    } 
  } 
}

レスポンスは次のようになります。

{ 
  ...,
  "hits" : {...},
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" :0,
      "sum_other_doc_count" :0,
      "buckets" : [ 
        { 
          "key" : "docker0",
          ...
        },
        { 
          "key" : "enp0s31f6",
          ...
        },
        { 
          "key" : "lo",
          ...
        },
        { 
          "key" : "wlp61s0",
          "doc_count" :30,
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" :"2019-08-12T14:07:00.000Z",
                "key" :1565618820000,
                "doc_count" :4,
                "myNetworkInBytesMax" : { 
                  "value" :3.030494669E9 
                },
                "myNetworkOutBytesMax" : { 
                  "value" :5.56084749E8 
                } 
              },
              { 
                "key_as_string" :"2019-08-12T14:08:00.000Z",
                "key" :1565618880000,
                "doc_count" :6,
                "myNetworkInBytesMax" : { 
                  "value" :3.033793744E9 
                },
                "myNetworkOutBytesMax" : { 
                  "value" :5.56323416E8 
                },
                "myNetworkInBytesPerSecond" : { 
                  "value" :3299075.0,
                  "normalized_value" :54984.583333333336 
                },
                "myNetworkoutBytesPerSecond" : { 
                  "value" :238667.0,
                  "normalized_value" :3977.7833333333333 
                } 
              },
              { 
                "key_as_string" :"2019-08-12T14:09:00.000Z",
                "key" :1565618940000,
                "doc_count" :6,
                "myNetworkInBytesMax" : { 
                  "value" :3.037045046E9 
                },
                "myNetworkOutBytesMax" : { 
                  "value" :5.56566282E8 
                },
                "myNetworkInBytesPerSecond" : { 
                  "value" :3251302.0,
                  "normalized_value" :54188.36666666667 
                },
                "myNetworkoutBytesPerSecond" : { 
                  "value" :242866.0,
                  "normalized_value" :4047.766666666667 
                } 
              },
              ...  
            ] 
          } 
        },
        ...
      ] 
    } 
  } 
}

ご紹介したクエリはすべて、お使いのクラスターでお試しいただくことができます。クラスターをデプロイしていない方は、Elastic CloudのElasticsearch Serviceで無料トライアルにご登録いただくか、Elastic Stackのデフォルト配布パッケージをダウンロードしてお試しいただけます。Metricbeatでシステムのデータを送り、クエリしてみましょう。