在 Elasticsearch 中对时序型数据进行查询和聚合
最初,Elasticsearch 是一个搜索引擎,将搜索索引保存在 Lucene 数据库中。然而,Elasticsearch 从那时起已经历了巨大演变,现已成为一个高性能、集群式的可扩展数据存储。尽管从索引格式上仍能看出它的渊源,但 Elasticsearch 现在已被各类用户广泛用于各种用途。
它的用途之一便是存储、处理和检索时序型数据。时序型数据的特点是每个数据点都有一个相关的准确时间戳。最常见的情形是,一个数据点代表在具体时间点得到的某种测量结果,可以是股价、科学观察值或者服务器负载。
尽管已有多个专门处理时序型数据的数据库实施方案,但是在存储和查询时序型数据时仍没有通用格式,而且理论上所有数据库引擎均可用来处理时序型数据。
从抽象意义上讲,一个时序型数据条目包括下列内容:
- 序列名称
- 时间戳
- 值
- 一个键值对(亦称标签)集合,包括序列的更多相关信息
在服务器监测用例中,肯定会有一个键值对用以指明时序型数据属于哪个主机,但还可以添加任何其他信息,而且添加的这些信息之后可用来请求只获取有关特定主机集合的指标。举一些例子,运行特定服务的主机,仅属于生产环境的主机,或者在特定云服务提供商平台上运行的实例。
为了更接地气,我们使用 Metricbeat 数据作为示例来看一下如何使用 Elasticsearch 查询来从数据中筛选出特定的时序型信息。
每个 Metricbeat 文档都包括下列信息:
- 时间戳
- 真正的时序型数据
- 对 Metricbeat 而言,关于文档中这一部分格式的描述详见 Metricbeat 文档。如要详细了解
system
模块的cpu
指标集,请参见指标集文档。
- 对 Metricbeat 而言,关于文档中这一部分格式的描述详见 Metricbeat 文档。如要详细了解
- 文档中所包含的有关指标本身的元数据。Metricbeat 使用 ECS 字段
event.module
和event.dataset
来指定创建文档时使用的是哪个 Metricbeat 模块,以及文档中包含哪个指标集。- 在您尝试检索时序型数据之前,这些信息能够帮您先了解文档中有哪些指标。
- 实例的相关元数据,其为实体主机、虚拟机,还是诸如 Kubernetes Pod 或 Docker 容器等小型实体
- 这一元数据符合 Elastic Common Schema,所以能够与来自其他也使用 ECS 的来源的数据进行匹配。
举个例子,system.cpu
指标集中的 Metricbeat 文档就是下面这个样子。_source
对象的内联注释表示您可以从哪里获取有关该字段的更多信息:
- ECS 文档
- Metricbeat 文档
- system.cpu 指标集文档
注意:为便于理解,文档在 JSON 文件中添加了 # 注释。
{ "_index" : "metricbeat-8.0.0-2019.08.12-000001", "_type" : "_doc", "_id" : "vWm5hWwB6vlM0zxdF3Q5", "_score" :0.0, "_source" : { "@timestamp" :"2019-08-12T12:06:34.572Z", "ecs" : { # ECS metadata "version" :"1.0.1" }, "host" : { # ECS metadata "name" : "noether", "hostname" : "noether", "architecture" : "x86_64", "os" : { "kernel" :"4.15.0-55-generic", "codename" : "bionic", "platform" : "ubuntu", "version" :"18.04.3 LTS (Bionic Beaver)", "family" : "debian", "name" :"Ubuntu" }, "id" :"4e3eb308e7f24789b4ee0b6b873e5414", "containerized" : false }, "agent" : { # ECS metadata "ephemeral_id" :"7c725f8a-ac03-4f2d-a40c-3695a3591699", "hostname" : "noether", "id" : "e8839acc-7f5e-40be-a3ab-1cc891bcb3ce", "version" :"8.0.0", "type" : "metricbeat" }, "event" : { # ECS metadata "dataset" : "system.cpu", "module" : "system", "duration" :725494 }, "metricset" : { # metricbeat metadata "name" : "cpu" }, "service" : { # metricbeat metadata "type" : "system" }, "system" : { # metricbeat time series data "cpu" : { "softirq" : { "pct" :0.0112 }, "steal" : { "pct" :0 }, "cores" :8, "irq" : { "pct" :0 }, "idle" : { "pct" :6.9141 }, "nice" : { "pct" :0 }, "user" : { "pct" :0.7672 }, "system" : { "pct" :0.3024 }, "iowait" : { "pct" :0.0051 }, "total" : { "pct" :1.0808 } } } } }
总结一下,在 Metricbeat 文档中,时序型数据和元数据混合在一起,所以您需拥有文档格式的具体相关知识才能准确检索出所需的内容。
然而,如果您想处理、分析或可视化时序型数据,这些数据通常应该为类似表格的形式,如下所示:
<series name> <timestamp> <value> <key-value pairs> system.cpu.user.pct 1565610800000 0.843 host.name=”noether” system.cpu.user.pct 1565610800000 0.951 host.name=”hilbert” system.cpu.user.pct 1565610810000 0.865 host.name=”noether” system.cpu.user.pct 1565610810000 0.793 host.name=”hilbert” system.cpu.user.pct 1565610820000 0.802 host.name=”noether” system.cpu.user.pct 1565610820000 0.679 host.name=”hilbert”
Elasticsearch 查询能帮助您通过编程方式以极其接近此类表格的形式检索时序性数据,下面的例子为您展示了操作过程。如想亲自体验查询过程,您需要一个 Elasticsearch 实例,还需要安装并运行 Metricbeat 以让其为 system.cpu
和 system.network
指标集传输数据。如需 Metricbeat 的简短介绍,请参看入门文档。
您可以从 Kibana 中的开发工具 (Dev Tools) 控制台运行所有查询。如之前并未用过,您可以查看 Kibana 控制台文档简单了解一下。请注意,您需要更改示例查询中的主机名。
我们假设您已按照默认配置将 Metricbeat 设置完毕。这即表示它每天会创建一个索引,而且这些索引的命名规则为“metricbeat-版本号-日期-计数”,例如 metricbeat-7.3.0-2019.08.06-000009
。要一次性查询所有这些索引,我们需要使用通配符:
示例查询:
GET metricbeat-*/_search
示例响应如下:
{ "took" :2, "timed_out" : false, "_shards" : { "total" :1, "successful" :1, "skipped" :0, "failed" :0 }, "hits" : { "total" : { "value" :10000, "relation" : "gte" }, "max_score" :1.0, "hits" : [...] } }
很明显,该查询超出了 Elasticsearch 在单次查询中可返回文档的数量限制。这里省略了真实命中信息,但您可能希望滚动查询结果并与上面已加注释的文档进行比较。
根据所监测基础设施的规模,虽然可能会有海量的 Metricbeat 文档,但您很少需要从最开始(记录)的时间点查询时序型数据,所以我们开始时用一个日期范围,在本案例中是过去 5 分钟:
示例查询:
GET metricbeat-*/_search { "query": { "range": { "@timestamp": { "gte": "now-5m" } } } }
示例响应如下:
{ "took" :4, "timed_out" : false, "_shards" : { "total" :1, "successful" :1, "skipped" :0, "failed" :0 }, "hits" : { "total" : { "value" :30, "relation" : "eq" }, "max_score" :0.0, "hits" : [...] } }
这个规模管理起来要容易得多。然而,运行此查询时所基于的系统仅有一个主机向其报告,所以在生产环境中,命中数仍然会很高。
如要检索特定主机的所有 CPU 数据,进行 Elasticsearch 查询时第一步原生尝试可能是针对 host.name 和指标集 system.cpu
添加筛选:
示例查询:
GET metricbeat-*/_search { "query": { "bool": { "filter": [ { "range": { "@timestamp": { "gte": "now-5m" } } }, { "bool": { "should": [ { "match_phrase": { "host.name": "noether" } }, { "match_phrase": { "event.dataset": "system.cpu" } } ] } } ] } } }
示例响应如下:
{ "took" :8, "timed_out" : false, "_shards" : { "total" :1, "successful" :1, "skipped" :0, "failed" :0 }, "hits" : { "total" : { "value" :30, "relation" : "eq" }, "max_score" :0.0, "hits" : [...] } }
此查询仍会返回大量文档,所有都包含 Metricbeat 发送的有关 system.cpu
指标集的完整数据。这个结果的用处并不大,原因如下。
首先,我们需要检索整个时间范围内的所有文档。一旦我们达到所配置的上限,Elasticsearch 将不会一次性返回这些结果;它会尝试对文档排序,这对我们的查询根本不适用;同时 Elasticsearch 返回结果时不会按时间戳进行排序。
第二,我们仅对每个文档中的一小部分内容感兴趣:时间戳、几个指标值,可能还有一些其他元数据字段。从 Elasticsearch 中返回全部的 _source
,然后再从查询结果中挑选数据,这种方法的效率很低。
解决这种问题的方法之一就是利用 Elasticsearch 聚合。
示例 1:CPU 百分比,已缩小采样
我们首先看一下日期直方图。日期直方图聚合将会为每个时间间隔返回一个值。返回的桶已按时间进行排序,而且用户可以指定间隔(又称桶大小)来匹配数据。在这个示例中,我们将间隔时长选为 10 秒,因为 Metricbeat 默认每 10 秒从系统模块发送一次数据。顶层的 size: 0
参数表示我们对实际命中结果不再感兴趣,而只对聚合感兴趣,所以不会返回任何文档。
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "fixed_interval":"10s" } } } }
示例响应如下:
{ ..., "hits" : { "total" : { "value" :30, "relation" : "eq" }, "max_score" : null, "hits" : [ ] }, "aggregations" : { "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:03:20.000Z", "key" :1565615000000, "doc_count" :1 }, { "key_as_string" :"2019-08-12T13:03:30.000Z", "key" :1565615010000, "doc_count" :1 }, { "key_as_string" :"2019-08-12T13:03:40.000Z", "key" :1565615020000, "doc_count" :1 }, ... ] } } }
对于每个桶,这会在 key
中返回时间戳,还会返回很有帮助的 key_as_string
(其中包含用户可读的日期时间字符串),以及桶中包含的文档数量。
这个案例的 doc_count
是 1,因为桶大小与 Metricbeat 的报告期间相匹配。如果没有其他信息,这一结果并没什么用处,所以为了看到真正的指标值,我们需要再添加一个聚合。在这一步,我们需要决定聚合类型——对数值而言,avg
、min
和 max
都是不错的选择——但由于我们每个桶只有一个文档,所以无论选择哪个都没什么影响。下面的示例便很好地展示了这一点,因为它针对指标 system.cpu.user.pct
在桶的 10 秒期间内的值返回了 avg
、min
和 max
聚合:
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "fixed_interval":"10s" }, "aggregations": { "myActualCpuUserMax": { "max": { "field": "system.cpu.user.pct" } }, "myActualCpuUserAvg": { "avg": { "field": "system.cpu.user.pct" } }, "myActualCpuUserMin": { "min": { "field": "system.cpu.user.pct" } } } } } }
示例响应如下:
{ ..., "hits" : {...}, "aggregations" : { "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:12:40.000Z", "key" :1565615560000, "doc_count" :1, "myActualCpuUserMin" : { "value" :1.002 }, "myActualCpuUserAvg" : { "value" :1.002 }, "myActualCpuUserMax" : { "value" :1.002 } }, { "key_as_string" :"2019-08-12T13:12:50.000Z", "key" :1565615570000, "doc_count" :1, "myActualCpuUserMin" : { "value" :0.866 }, "myActualCpuUserAvg" : { "value" :0.866 }, "myActualCpuUserMax" : { "value" :0.866 } }, ... ] } } }
您可以看到,在每个桶中 myActualCpuUserMin
、myActualCpuUserAvg
和 myActualCpuUserMax
是一样的,所以如果需要检索按固定间隔报告的时序型数据的原始值,您可以使用日期直方图来实现。
然而,您多数情况下不会对每个单独的数据点感兴趣,如果每隔几秒便取一个测量值的话,则更是如此。出于很多目的,实际上更好的一种方法是拥有更粗粒度的数据:举个例子,如果一个可视化仅有限定数量的像素来展示时序型数据的变化,那么其在呈现时便会舍弃较细粒度的数据。
我们通常会缩小时序型数据的采样,直至其粒度与任何后续处理步骤的要求相符。在缩小采样的过程中,给定时间段内的多个数据点会缩减为一个点。在我们的服务器监测示例中,数据的测量频率为每 10 秒一次,但多数情况下,一分钟内所有值的平均值应该就可以。偶尔情况下,缩小采样的过程与日期直方图聚合的过程一模一样,前提是日期直方图聚合过程为每个桶找到多于一份文档,并且应用了正确的嵌套聚合。
下面的示例展示了在完整的 1 分钟桶内采用嵌套 avg
、min
和 max
聚合的日期直方图结果,给出了缩小采样的第一个示例。由于使用了 calendar_interval
,而未使用 fixed_interval
,所以此参数会将桶边界调整为整分钟。
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myDownsampledCpuUserMax": { "max": { "field": "system.cpu.user.pct" } }, "myDownsampledCpuUserAvg": { "avg": { "field": "system.cpu.user.pct" } }, "myDownsampledCpuUserMin": { "min": { "field": "system.cpu.user.pct" } } } } } }
示例响应如下:
{ ..., "hits" : {...}, "aggregations" : { "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:27:00.000Z", "key" :1565616420000, "doc_count" :4, "myDownsampledCpuUserMax" : { "value" :0.927 }, "myDownsampledCpuUserMin" : { "value" :0.6980000000000001 }, "myDownsampledCpuUserAvg" : { "value" :0.8512500000000001 } }, { "key_as_string" :"2019-08-12T13:28:00.000Z", "key" :1565616480000, "doc_count" :6, "myDownsampledCpuUserMax" : { "value" :0.838 }, "myDownsampledCpuUserMin" : { "value" :0.5670000000000001 }, "myDownsampledCpuUserAvg" : { "value" :0.7040000000000001 } }, ... ] } } }
如您所见,myActualCpuUserMin
、myActualCpuUserAvg
和 myActualCpuUserMax
现在的值不同,具体取决于所用的聚合。
缩小采样时采用哪种方法与指标息息相关。对于 CPU 百分比,一分钟内的 avg
聚合就可以,针对诸如队列时长和系统负载等指标,max
聚合则可能更为合适。
现在,还可以使用 Elasticsearch 来执行一些简单的代数运算,并计算原始数据中没有的时序型数据。假设我们针对 CPU 进行 avg
聚合,则我们的示例可加以优化以返回用户 CPU、系统 CPU,以及用户和系统总和除以 CPU 核数,命令如下:
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myDownsampledCpuUserAvg": { "avg": { "field": "system.cpu.user.pct" } }, "myDownsampledCpuSystemAvg": { "avg": { "field": "system.cpu.system.pct" } }, "myCpuCoresMax": { "max": { "field": "system.cpu.cores" } }, "myCalculatedCpu": { "bucket_script": { "buckets_path": { "user": "myDownsampledCpuUserAvg", "system": "myDownsampledCpuSystemAvg", "cores": "myCpuCoresMax" }, "script": { "source": "(params.user + params.system) / params.cores", "lang": "painless" } } } } } } }
示例响应如下:
{ ..., "hits" : {...}, "aggregations" : { "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:32:00.000Z", "key" :1565616720000, "doc_count" :2, "myDownsampledCpuSystemAvg" : { "value" :0.344 }, "myCpuCoresMax" : { "value" :8.0 }, "myDownsampledCpuUserAvg" : { "value" :0.8860000000000001 }, "myCalculatedCpu" : { "value" :0.15375 } }, { "key_as_string" :"2019-08-12T13:33:00.000Z", "key" :1565616780000, "doc_count" :6, "myDownsampledCpuSystemAvg" : { "value" :0.33416666666666667 }, "myCpuCoresMax" : { "value" :8.0 }, "myDownsampledCpuUserAvg" : { "value" :0.8895 }, "myCalculatedCpu" : { "value" :0.15295833333333334 } }, ... ] } } }
示例 2:网络流量——多值聚合和导数聚合
关于 Elasticsearch 聚合在处理时序型数据时的巨大作用,我们还可以举一个更加详细的关于 system.network
指标集的示例。system.network
指标集文档中的相关部分如下所示:
{ ... "system": { "network": { "in": { "bytes":37904869172, "dropped":32, "errors":0, "packets":32143403 }, "name": "wlp4s0", "out": { "bytes":6299331926, "dropped":0, "errors":0, "packets":13362703 } } } ... }
Metricbeat 会为系统中存在的每个网络接口发送一份文档。这些文档的时间戳相同,但是 system.network.name
字段的值不同,每个网络接口都有一个值。
任何进一步的聚合都需要按照接口完成,所以我们针对 system.network.name
字段将上个例子中顶层的日期直方图聚合更改为多值聚合。
请注意如要此方法奏效,需要将所聚合的字段映射为关键字字段。如果您使用 Metricbeat 提供的默认索引模板,则此映射可能已为您设置完毕。如未设置,Metricbeat 模板文档页面就您需要完成的操作给出了简短描述。
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myNetworkInterfaces": { "terms": { "field": "system.network.name", "size":50 }, "aggs": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" } } } } } }
示例响应如下:
{ ..., "hits" : {...}, "aggregations" : { "myNetworkInterfaces" : { "doc_count_error_upper_bound" :0, "sum_other_doc_count" :0, "buckets" : [ { "key" : "docker0", "doc_count" :29, "myDateHistogram" : { "buckets" : [...] } }, { "key" : "enp0s31f6", "doc_count" :29, "myDateHistogram" : { "buckets" : [...] } }, { "key" : "lo", "doc_count" :29, "myDateHistogram" : { "buckets" : [...] } }, { "key" : "wlp61s0", "doc_count" :29, "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:39:00.000Z", "key" :1565617140000, "doc_count" :1 }, { "key_as_string" :"2019-08-12T13:40:00.000Z", "key" :1565617200000, "doc_count" :6 }, { "key_as_string" :"2019-08-12T13:41:00.000Z", "key" :1565617260000, "doc_count" :6 }, { "key_as_string" :"2019-08-12T13:42:00.000Z", "key" :1565617320000, "doc_count" :6 }, { "key_as_string" :"2019-08-12T13:43:00.000Z", "key" :1565617380000, "doc_count" :6 }, { "key_as_string" :"2019-08-12T13:44:00.000Z", "key" :1565617440000, "doc_count" :4 } ] } }, ... ] } } }
和 CPU 示例一样,不使用嵌套聚合的话,日期直方图聚合仅会返回用处不太大的 doc_count
。
字节字段包含单调递增值。这些字段的值包括自机器上次启动以来所发送或接收的字节数,所以每次测量时该值都会增大。在这个案例中,正确的嵌套聚合是 max
,所以缩小采样后的值要包括最高值,也就是桶间隔期间所得到的最新测量值。
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myNetworkInterfaces": { "terms": { "field": "system.network.name", "size":50 }, "aggs": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myNetworkInBytesMax": { "max": { "field": "system.network.in.bytes" } }, "myNetworkOutBytesMax": { "max": { "field": "system.network.out.bytes" } } } } } } } }
And the following example response:
{ ..., "hits" : {...}, "aggregations" : { "myNetworkInterfaces" : { "doc_count_error_upper_bound" :0, "sum_other_doc_count" :0, "buckets" : [ { "key" : "docker0", ... }, { "key" : "enp0s31f6", ... }, { "key" : "lo", ... }, { "key" : "wlp61s0", "doc_count" :30, "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T13:50:00.000Z", "key" :1565617800000, "doc_count" :2, "myNetworkInBytesMax" : { "value" :2.991659837E9 }, "myNetworkOutBytesMax" : { "value" :5.46578365E8 } }, { "key_as_string" :"2019-08-12T13:51:00.000Z", "key" :1565617860000, "doc_count" :6, "myNetworkInBytesMax" : { "value" :2.992027006E9 }, "myNetworkOutBytesMax" : { "value" :5.46791988E8 }, "myNetworkInBytesPerSecond" : { "value" :367169.0, "normalized_value" :6119.483333333334 }, "myNetworkoutBytesPerSecond" : { "value" :213623.0, "normalized_value" :3560.383333333333 } }, ... ] } }, ... ] } } }
如要从单调递增计数中获得每秒的字节速率,需要使用导数聚合。当此聚合收到所传递的可选参数 unit
时,会在 normalized_value
字段中返回理想的每单位的数值:
示例查询:
GET metricbeat-*/_search { "query": {...}, # same as above "size":0, "aggregations": { "myNetworkInterfaces": { "terms": { "field": "system.network.name", "size":50 }, "aggs": { "myDateHistogram": { "date_histogram": { "field": "@timestamp", "calendar_interval":"1m" }, "aggregations": { "myNetworkInBytesMax": { "max": { "field": "system.network.in.bytes" } }, "myNetworkInBytesPerSecond": { "derivative": { "buckets_path": "myNetworkInBytesMax", "unit":"1s" } }, "myNetworkOutBytesMax": { "max": { "field": "system.network.out.bytes" } }, "myNetworkoutBytesPerSecond": { "derivative": { "buckets_path": "myNetworkOutBytesMax", "unit":"1s" } } } } } } } }
示例响应如下:
{ ..., "hits" : {...}, "aggregations" : { "myNetworkInterfaces" : { "doc_count_error_upper_bound" :0, "sum_other_doc_count" :0, "buckets" : [ { "key" : "docker0", ... }, { "key" : "enp0s31f6", ... }, { "key" : "lo", ... }, { "key" : "wlp61s0", "doc_count" :30, "myDateHistogram" : { "buckets" : [ { "key_as_string" :"2019-08-12T14:07:00.000Z", "key" :1565618820000, "doc_count" :4, "myNetworkInBytesMax" : { "value" :3.030494669E9 }, "myNetworkOutBytesMax" : { "value" :5.56084749E8 } }, { "key_as_string" :"2019-08-12T14:08:00.000Z", "key" :1565618880000, "doc_count" :6, "myNetworkInBytesMax" : { "value" :3.033793744E9 }, "myNetworkOutBytesMax" : { "value" :5.56323416E8 }, "myNetworkInBytesPerSecond" : { "value" :3299075.0, "normalized_value" :54984.583333333336 }, "myNetworkoutBytesPerSecond" : { "value" :238667.0, "normalized_value" :3977.7833333333333 } }, { "key_as_string" :"2019-08-12T14:09:00.000Z", "key" :1565618940000, "doc_count" :6, "myNetworkInBytesMax" : { "value" :3.037045046E9 }, "myNetworkOutBytesMax" : { "value" :5.56566282E8 }, "myNetworkInBytesPerSecond" : { "value" :3251302.0, "normalized_value" :54188.36666666667 }, "myNetworkoutBytesPerSecond" : { "value" :242866.0, "normalized_value" :4047.766666666667 } }, ... ] } }, ... ] } } }
您可在自己的集群上尝试所有命令,如果您还没有集群,可以免费试用基于 Elastic Cloud 的 Elasticsearch Service 并快速部署一个集群,您还可以下载 Elastic Stack 的默认分发包。使用 Metricbeat 开始从您的系统发送数据吧,畅享查询的乐趣!