Tech Topics

# Querying and aggregating time series data in Elasticsearch

Historically, Elasticsearch is a search engine, which keeps its search indices in a Lucene database. From that starting point, however, Elasticsearch itself has evolved into a highly performant, clusterable, scalable datastore. Its index format still reflects the history of its beginnings, but it is used by all kinds of users for all kinds of purposes.

One of these purposes is to store, process, and retrieve time series data. Time series data is characterized by every data point being associated with a precise timestamp. Most often, a data point represents some kind of measurement taken at a specific point in time. This can be a stock price, a scientific observation, or the load of a server.

While there exist several database implementations specialized on handling time series data, there is no common format in which time series are stored and queried, and in theory any database engine can be used to handle time series data.

A time series database entry, in abstract form, consists of

• the series name
• a timestamp
• a value
• a set of key-value pairs, or tags, containing further information about the series

In a server monitoring use case, there will always be one key-value pair to specify which host a time series belongs to, but the additional information that can be added can be anything, and can later be used to only request metrics about a specific set of hosts. Examples would be hosts running specific services, only those that belong to a production environment, or instances running on a certain cloud provider.

To make this more practical, let’s use Metricbeat data as an example of how you can use Elasticsearch queries to sift out particular time series information from your data.

Each Metricbeat document contains the following information:

• A timestamp
• Actual time series data
• For Metricbeat, the format of this part of the document is described in the Metricbeat documentation. To learn more about the cpu metricset of the system module, see the metricset documentation.
• Metadata about the metrics themselves that are contained in a document. Metricbeat uses the ECS fields event.module and event.dataset to specify which Metricbeat module created the document, and which metricset is contained in the document.
• This can be helpful to find out which metrics are present in the first place, before you try to retrieve time series data.
• Metadata about the instance, whether it be a physical host, a virtual machine, or a smaller entity like a Kubernetes pod or a Docker container
• This metadata follows the Elastic Common Schema so that it can be matched with data from other sources that also use ECS.

As an example, this is what a Metricbeat document from the system.cpu metricset looks like. The inline comments on the _source object indicate where you can find more information about the field:

{
"_index" : "metricbeat-8.0.0-2019.08.12-000001",
"_type" : "_doc",
"_id" : "vWm5hWwB6vlM0zxdF3Q5",
"_score" : 0.0,
"_source" : {
"@timestamp" : "2019-08-12T12:06:34.572Z",
"ecs" : { # ECS metadata
"version" : "1.0.1"
},
"host" : { # ECS metadata
"name" : "noether",
"hostname" : "noether",
"architecture" : "x86_64",
"os" : {
"kernel" : "4.15.0-55-generic",
"codename" : "bionic",
"platform" : "ubuntu",
"version" : "18.04.3 LTS (Bionic Beaver)",
"family" : "debian",
"name" : "Ubuntu"
},
"id" : "4e3eb308e7f24789b4ee0b6b873e5414",
"containerized" : false
},
"agent" : { # ECS metadata
"ephemeral_id" : "7c725f8a-ac03-4f2d-a40c-3695a3591699",
"hostname" : "noether",
"id" : "e8839acc-7f5e-40be-a3ab-1cc891bcb3ce",
"version" : "8.0.0",
"type" : "metricbeat"
},
"event" : { # ECS metadata
"dataset" : "system.cpu",
"module" : "system",
"duration" : 725494
},
"metricset" : { # metricbeat metadata
"name" : "cpu"
},
"service" : { # metricbeat metadata
"type" : "system"
},
"system" : { # metricbeat time series data
"cpu" : {
"softirq" : {
"pct" : 0.0112
},
"steal" : {
"pct" : 0
},
"cores" : 8,
"irq" : {
"pct" : 0
},
"idle" : {
"pct" : 6.9141
},
"nice" : {
"pct" : 0
},
"user" : {
"pct" : 0.7672
},
"system" : {
"pct" : 0.3024
},
"iowait" : {
"pct" : 0.0051
},
"total" : {
"pct" : 1.0808
}
}
}
}
}


To summarize, within a Metricbeat document, time series data and metadata are mixed, and you need specific knowledge about the document format to retrieve exactly what you need.

In contrast, if you want to process, analyze, or visualize time series data, the data typically should be in a table-like format like this:

<series name> <timestamp> <value> <key-value pairs>
system.cpu.user.pct 1565610800000 0.843 host.name=”noether”
system.cpu.user.pct 1565610800000 0.951 host.name=”hilbert”
system.cpu.user.pct 1565610810000 0.865 host.name=”noether”
system.cpu.user.pct 1565610810000 0.793 host.name=”hilbert”
system.cpu.user.pct 1565610820000 0.802 host.name=”noether”
system.cpu.user.pct 1565610820000 0.679 host.name=”hilbert”


Elasticsearch queries can help you programmatically retrieve time series data in a format that is very close to such a table, and the following examples show how to do this. If you want to experiment with the queries yourself, you will need an Elasticsearch instance and a running Metricbeat installation that is shipping data for the system.cpu and system.network metricsets. For a short introduction into Metricbeat, see the getting started documentation.

You can run all queries from Kibana’s Dev Tools console. If you haven’t used it before, you can find a short introduction in the Kibana console docs. Note that you will need to change the hostname in the example queries.

We assume that Metricbeat is set up following the default configuration. This means it will create one index per day, and these indices will be named ‘metricbeat-VERSION-DATE-COUNTER’, e.g. something like metricbeat-7.3.0-2019.08.06-000009. To query all these indices at once, we’ll use a wildcard:

Sample query:

GET metricbeat-*/_search


And the following example response:

{
"took" : 2,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 10000,
"relation" : "gte"
},
"max_score" : 1.0,
"hits" : [...]
}
}


Naturally, this query exceeds Elasticsearch’s limit of documents that will be returned in one query. The actual hits have been omitted here, but you might want to scroll through your results and compare them with the annotated document above.

Depending on the size of your monitored infrastructure there can be a huge number of Metricbeat documents, but it is rare that you need a time series from the beginning of (recorded) time, so let’s start with a date range, in this case, the last five minutes:

Sample query:

GET metricbeat-*/_search
{
"query": {
"range": {
"@timestamp": {
"gte": "now-5m"
}
}
}
}


And the following example response:

{
"took" : 4,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 30,
"relation" : "eq"
},
"max_score" : 0.0,
"hits" : [...]
}
}


This is now a much more manageable size. However, the system this query is run on has only one host reporting to it, so in a production environment the number of hits will still be very high.

To retrieve all CPU data for a specific host, a first naive attempt at an Elasticsearch query might be to add filters for the host.name and the metricset system.cpu:

Sample query:

GET metricbeat-*/_search
{
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-5m"
}
}
},
{
"bool": {
"should": [
{
"match_phrase": {
"host.name": "noether"
}
},
{
"match_phrase": {
"event.dataset": "system.cpu"
}
}
]
}
}
]
}
}
}


And the following example response:

{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 30,
"relation" : "eq"
},
"max_score" : 0.0,
"hits" : [...]
}
}


This query still returns a large number of documents, all containing the full data that is sent by Metricbeat for the system.cpu metricset. It is not a very useful result, for a few reasons.

First, we would need to retrieve all documents over the whole time range. As soon as we reach the configured limit, Elasticsearch will not return these in one go; it will try to rank them, which does not make sense with our query; and it will not return the documents sorted by timestamp.

Second, we are only interested in a small part of each document: the timestamp, a few metric values, and possibly some additional metadata fields. Returning the whole _source from Elasticsearch and then picking out the data from the query result is very inefficient.

One approach to solve this is to use Elasticsearch aggregations.

### Example 1: CPU percentage, downsampled

For a start, let’s look at date histograms. A date histogram aggregation will return one value per time interval. The returned buckets are already ordered by time, and the interval, or bucket size, can be specified to match the data. In this example, 10 seconds were chosen as interval size because Metricbeat sends data from the system module every 10 seconds by default. The toplevel size: 0 parameter specifies that we are no longer interested in the actual hits, but only in the aggregation, so no documents will be returned.

Sample query:

GET metricbeat-*/_search
{
"query": {...},  # same as above
"size": 0,
"aggregations": {
"myDateHistogram": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "10s"
}
}
}
}


And the following example response:

{
...,
"hits" : {
"total" : {
"value" : 30,
"relation" : "eq"
},
"max_score" : null,
"hits" : [ ]
},
"aggregations" : {
"myDateHistogram" : {
"buckets" : [
{
"key_as_string" : "2019-08-12T13:03:20.000Z",
"key" : 1565615000000,
"doc_count" : 1
},
{
"key_as_string" : "2019-08-12T13:03:30.000Z",
"key" : 1565615010000,
"doc_count" : 1
},
{
"key_as_string" : "2019-08-12T13:03:40.000Z",
"key" : 1565615020000,
"doc_count" : 1
},
...
]
}
}
}


For each bucket, this returns the timestamp in the key, a helpful key_as_string containing a human readable datetime string, and the number of documents that ended up in the bucket.

This doc_count is 1, because the bucket size matches the reporting period of Metricbeat. It is otherwise not very useful, so to see actual metric values, we’ll add another aggregation. At this step we need to decide what this aggregation will be — for numeric values, avg, min, and max are good candidates — but as long as we only have one document per bucket, it doesn’t actually matter which one we choose. The following example illustrates this by returning avg, min, and max aggregations on the values of the metric system.cpu.user.pct within buckets 10 seconds long:

Sample query:

GET metricbeat-*/_search
{
"query": {...},  # same as above
"size": 0,
"aggregations": {
"myDateHistogram": {
"date_histogram": {
"field": "@timestamp",
"fixed_interval": "10s"
},
"aggregations": {
"myActualCpuUserMax": {
"max": {
"field": "system.cpu.user.pct"
}
},
"myActualCpuUserAvg": {
"avg": {
"field": "system.cpu.user.pct"
}
},
"myActualCpuUserMin": {
"min": {
"field": "system.cpu.user.pct"
}
}
}
}
}
}


And the following example response:

{
...,
"hits" : {...},
"aggregations" : {
"myDateHistogram" : {
"buckets" : [
{
"key_as_string" : "2019-08-12T13:12:40.000Z",
"key" : 1565615560000,
"doc_count" : 1,
"myActualCpuUserMin" : {
"value" : 1.002
},
"myActualCpuUserAvg" : {
"value" : 1.002
},
"myActualCpuUserMax" : {
"value" : 1.002
}
},
{
"key_as_string" : "2019-08-12T13:12:50.000Z",
"key" : 1565615570000,
"doc_count" : 1,
"myActualCpuUserMin" : {
"value" : 0.866
},
"myActualCpuUserAvg" : {
"value" : 0.866
},
"myActualCpuUserMax" : {
"value" : 0.866
}
},
...
]
}
}
}


As you can see, the values for myActualCpuUserMin, myActualCpuUserAvg, and myActualCpuUserMax are the same in each bucket, so if you need to retrieve the raw values of a time series that has been reported at regular intervals, you can use a date histogram to achieve this.

Most of the time, however, you won’t be interested in every single data point, especially when measurements are taken every few seconds. For many purposes it is actually better to have data with a coarser granularity: for example, a visualization only has a limited amount of pixels to display variations in a time series, so data with a higher granularity will be thrown away at rendering time.

Time series data is often downsampled to a granularity that matches the requirements of whatever processing step that comes next. In downsampling, multiple data points within a given time period are reduced to a single one. In our server monitoring example, the data may be measured every 10 seconds, but for most purposes the average of all values within one minute would be just fine. Incidentally, downsampling is exactly what a date histogram aggregation does when it finds more than one document per bucket, and the correct nested aggregations are used.

The following example shows the result of a date histogram with nested avg, min, and max aggregations over a full 1-minute bucket, giving a first example of downsampling. The use of calendar_interval instead of fixed_interval aligns the bucket boundaries to full minutes.

Sample query:

GET metricbeat-*/_search
{
"query": {...},  # same as above
"size": 0,
"aggregations": {
"myDateHistogram": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1m"
},
"aggregations": {
"myDownsampledCpuUserMax": {
"max": {
"field": "system.cpu.user.pct"
}
},
"myDownsampledCpuUserAvg": {
"avg": {
"field": "system.cpu.user.pct"
}
},
"myDownsampledCpuUserMin": {
"min": {
"field": "system.cpu.user.pct"
}
}
}
}
}
}


And the following example response:

{
...,
"hits" : {...},
"aggregations" : {
"myDateHistogram" : {
"buckets" : [
{
"key_as_string" : "2019-08-12T13:27:00.000Z",
"key" : 1565616420000,
"doc_count" : 4,
"myDownsampledCpuUserMax" : {
"value" : 0.927
},
"myDownsampledCpuUserMin" : {
"value" : 0.6980000000000001
},
"myDownsampledCpuUserAvg" : {
"value" : 0.8512500000000001
}
},
{
"key_as_string" : "2019-08-12T13:28:00.000Z",
"key" : 1565616480000,
"doc_count" : 6,
"myDownsampledCpuUserMax" : {
"value" : 0.838
},
"myDownsampledCpuUserMin" : {
"value" : 0.5670000000000001
},
"myDownsampledCpuUserAvg" : {
"value" : 0.7040000000000001
}
},
...
]
}
}
}


As you can see, the values for myActualCpuUserMin, myActualCpuUserAvg, and myActualCpuUserMax are now different and depend on the aggregation used.

Which method is used for downsampling depends on the metric. For CPU percentage, the avg aggregation over one minute is fine, for metrics like queue lengths or system load a max aggregation might be more appropriate.

At this point it is also possible to use Elasticsearch to perform some basic arithmetic and calculate a time series that is not in the original data. Assuming we work with the avg aggregation for CPU, our example could be enhanced to return user CPU, system CPU, and the sum of user + system divided by CPU cores, like this:

Sample query:

GET metricbeat-*/_search
{
"query": {...},   # same as above
"size": 0,
"aggregations": {
"myDateHistogram": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1m"
},
"aggregations": {
"myDownsampledCpuUserAvg": {
"avg": {
"field": "system.cpu.user.pct"
}
},
"myDownsampledCpuSystemAvg": {
"avg": {
"field": "system.cpu.system.pct"
}
},
"myCpuCoresMax": {
"max": {
"field": "system.cpu.cores"
}
},
"myCalculatedCpu": {
"bucket_script": {
"buckets_path": {
"user": "myDownsampledCpuUserAvg",
"system": "myDownsampledCpuSystemAvg",
"cores": "myCpuCoresMax"
},
"script": {
"source": "(params.user + params.system) / params.cores",
"lang": "painless"
}
}
}
}
}
}
}


And the following example response:

{
...,
"hits" : {...},
"aggregations" : {
"myDateHistogram" : {
"buckets" : [
{
"key_as_string" : "2019-08-12T13:32:00.000Z",
"key" : 1565616720000,
"doc_count" : 2,
"myDownsampledCpuSystemAvg" : {
"value" : 0.344
},
"myCpuCoresMax" : {
"value" : 8.0
},
"myDownsampledCpuUserAvg" : {
"value" : 0.8860000000000001
},
"myCalculatedCpu" : {
"value" : 0.15375
}
},
{
"key_as_string" : "2019-08-12T13:33:00.000Z",
"key" : 1565616780000,
"doc_count" : 6,
"myDownsampledCpuSystemAvg" : {
"value" : 0.33416666666666667
},
"myCpuCoresMax" : {
"value" : 8.0
},
"myDownsampledCpuUserAvg" : {
"value" : 0.8895
},
"myCalculatedCpu" : {
"value" : 0.15295833333333334
}
},
...
]
}
}
}


### Example 2: Network Traffic — Terms and derivative aggregations

A slightly more elaborate example of how useful Elasticsearch aggregations can be for time series data is the system.network metricset. The relevant part of a system.network metricset document looks like this:

{
...
"system": {
"network": {
"in": {
"bytes": 37904869172,
"dropped": 32,
"errors": 0,
"packets": 32143403
},
"name": "wlp4s0",
"out": {
"bytes": 6299331926,
"dropped": 0,
"errors": 0,
"packets": 13362703
}
}
}
...
}


Metricbeat will send one document for every network interface present on the system. These documents will have the same timestamp, but different values for the field system.network.name, one per network interface.

Any further aggregation that is done needs to be done per interface, so we change the top-level date histogram aggregation of the previous examples to a terms aggregation over the field system.network.name.

Please note that for this to work the field which is aggregated over needs to be mapped as a keyword field. If you work with the default index template provided by Metricbeat, this mapping has already been set up for you. If not, the Metricbeat templates doc page has a short description of what you need to do.

Sample query:

GET metricbeat-*/_search
{
"query": {...}, # same as above
"size": 0,
"aggregations": {
"myNetworkInterfaces": {
"terms": {
"field": "system.network.name",
"size": 50
},
"aggs": {
"myDateHistogram": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1m"
}
}
}
}
}
}


And the following example response:

{
...,
"hits" : {...},
"aggregations" : {
"myNetworkInterfaces" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "docker0",
"doc_count" : 29,
"myDateHistogram" : {
"buckets" : [...]
}
},
{
"key" : "enp0s31f6",
"doc_count" : 29,
"myDateHistogram" : {
"buckets" : [...]
}
},
{
"key" : "lo",
"doc_count" : 29,
"myDateHistogram" : {
"buckets" : [...]
}
},
{
"key" : "wlp61s0",
"doc_count" : 29,
"myDateHistogram" : {
"buckets" : [
{
"key_as_string" : "2019-08-12T13:39:00.000Z",
"key" : 1565617140000,
"doc_count" : 1
},
{
"key_as_string" : "2019-08-12T13:40:00.000Z",
"key" : 1565617200000,
"doc_count" : 6
},
{
"key_as_string" : "2019-08-12T13:41:00.000Z",
"key" : 1565617260000,
"doc_count" : 6
},
{
"key_as_string" : "2019-08-12T13:42:00.000Z",
"key" : 1565617320000,
"doc_count" : 6
},
{
"key_as_string" : "2019-08-12T13:43:00.000Z",
"key" : 1565617380000,
"doc_count" : 6
},
{
"key_as_string" : "2019-08-12T13:44:00.000Z",
"key" : 1565617440000,
"doc_count" : 4
}
]
}
},
...
]
}
}
}


As in the CPU example, without a nested aggregation the date histogram aggregation only returns the doc_count, which is not very useful.

The fields for bytes contain monotonically increasing values. The value of these fields contains the number of bytes sent or received since the last time the machine was started, so it is increasing with every measurement. In this case, the correct nested aggregation is max, so that the downsampled value will contain the highest, and hence latest measurement taken during the bucket interval.

Sample query:

GET metricbeat-*/_search
{
"query": {...},  # same as above
"size": 0,
"aggregations": {
"myNetworkInterfaces": {
"terms": {
"field": "system.network.name",
"size": 50
},
"aggs": {
"myDateHistogram": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1m"
},
"aggregations": {
"myNetworkInBytesMax": {
"max": {
"field": "system.network.in.bytes"
}
},
"myNetworkOutBytesMax": {
"max": {
"field": "system.network.out.bytes"
}
}
}
}
}
}
}
}


And the following example response:

{
...,
"hits" : {...},
"aggregations" : {
"myNetworkInterfaces" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "docker0",
...
},
{
"key" : "enp0s31f6",
...
},
{
"key" : "lo",
...
},
{
"key" : "wlp61s0",
"doc_count" : 30,
"myDateHistogram" : {
"buckets" : [
{
"key_as_string" : "2019-08-12T13:50:00.000Z",
"key" : 1565617800000,
"doc_count" : 2,
"myNetworkInBytesMax" : {
"value" : 2.991659837E9
},
"myNetworkOutBytesMax" : {
"value" : 5.46578365E8
}
},
{
"key_as_string" : "2019-08-12T13:51:00.000Z",
"key" : 1565617860000,
"doc_count" : 6,
"myNetworkInBytesMax" : {
"value" : 2.992027006E9
},
"myNetworkOutBytesMax" : {
"value" : 5.46791988E8
},
"myNetworkInBytesPerSecond" : {
"value" : 367169.0,
"normalized_value" : 6119.483333333334
},
"myNetworkoutBytesPerSecond" : {
"value" : 213623.0,
"normalized_value" : 3560.383333333333
}
},
...
]
}
},
...
]
}
}
}


To get a rate of bytes per second from the monotonically increasing counter, a derivative aggregation can be used. When this aggregation is passed the optional parameter unit, it returns the desired value per unit in the normalized_value field:

Sample query:

GET metricbeat-*/_search
{
"query": {...},  # same as above
"size": 0,
"aggregations": {
"myNetworkInterfaces": {
"terms": {
"field": "system.network.name",
"size": 50
},
"aggs": {
"myDateHistogram": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1m"
},
"aggregations": {
"myNetworkInBytesMax": {
"max": {
"field": "system.network.in.bytes"
}
},
"myNetworkInBytesPerSecond": {
"derivative": {
"buckets_path": "myNetworkInBytesMax",
"unit": "1s"
}
},
"myNetworkOutBytesMax": {
"max": {
"field": "system.network.out.bytes"
}
},
"myNetworkoutBytesPerSecond": {
"derivative": {
"buckets_path": "myNetworkOutBytesMax",
"unit": "1s"
}
}
}
}
}
}
}
}


And the following example response:

{
...,
"hits" : {...},
"aggregations" : {
"myNetworkInterfaces" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "docker0",
...
},
{
"key" : "enp0s31f6",
...
},
{
"key" : "lo",
...
},
{
"key" : "wlp61s0",
"doc_count" : 30,
"myDateHistogram" : {
"buckets" : [
{
"key_as_string" : "2019-08-12T14:07:00.000Z",
"key" : 1565618820000,
"doc_count" : 4,
"myNetworkInBytesMax" : {
"value" : 3.030494669E9
},
"myNetworkOutBytesMax" : {
"value" : 5.56084749E8
}
},
{
"key_as_string" : "2019-08-12T14:08:00.000Z",
"key" : 1565618880000,
"doc_count" : 6,
"myNetworkInBytesMax" : {
"value" : 3.033793744E9
},
"myNetworkOutBytesMax" : {
"value" : 5.56323416E8
},
"myNetworkInBytesPerSecond" : {
"value" : 3299075.0,
"normalized_value" : 54984.583333333336
},
"myNetworkoutBytesPerSecond" : {
"value" : 238667.0,
"normalized_value" : 3977.7833333333333
}
},
{
"key_as_string" : "2019-08-12T14:09:00.000Z",
"key" : 1565618940000,
"doc_count" : 6,
"myNetworkInBytesMax" : {
"value" : 3.037045046E9
},
"myNetworkOutBytesMax" : {
"value" : 5.56566282E8
},
"myNetworkInBytesPerSecond" : {
"value" : 3251302.0,
"normalized_value" : 54188.36666666667
},
"myNetworkoutBytesPerSecond" : {
"value" : 242866.0,
"normalized_value" : 4047.766666666667
}
},
...
]
}
},
...
]
}
}
}


You can try all of this on your own cluster, or if you don't already have a cluster, you can spin up a free trial of the Elasticsearch Service on Elastic Cloud, or download the default distribution of the Elastic Stack. Start sending data from your systems with Metricbeat, and query away!

• #### We're hiring

Work for a global, distributed team where finding someone like you is just a Zoom meeting away. Flexible work with impact? Development opportunities from the start?