Búsqueda y agregación de datos temporales en Elasticsearch

Históricamente, Elasticsearch es un motor de búsqueda que mantiene sus índices de búsqueda en una base de datos de Lucene. Sin embargo, Elasticsearch en sí ha evolucionado desde esos comienzos a un almacén de datos muy eficiente, escalable y capaz de formar clusters. Su formato de índices aún refleja la historia de sus inicios, pero lo usan todo tipo de usuarios para todo tipo de fines.

Uno de estos fines es almacenar, procesar y recuperar datos temporales. Los datos temporales se caracterizan por cada punto de datos que se asocia con una marca de tiempo precisa. Muy a menudo, un punto de datos representa algún tipo de medición tomada en un momento específico. Puede ser un precio de acciones, una observación científica o la carga de un servidor.

Si bien existen varias implementaciones de base de datos especializadas en la administración de datos temporales, no hay un formato común en el cual las series temporales se almacenan y consultan, y en teoría cualquier motor de bases de datos puede usarse para administrar datos temporales.

Una entrada de base de datos de series temporales, en forma abstracta, consiste en lo siguiente:

  • El nombre de la serie
  • Una marca de tiempo
  • Un valor
  • Un conjunto de pares de valores clave, o etiquetas, con más información sobre la serie

En un caso de uso de monitoreo de servidor, siempre habrá un par de valores clave para especificar a qué serie temporal pertenece un host, pero la información adicional que se puede agregar puede ser cualquier cosa, y luego puede usarse solo para solicitar métricas sobre un conjunto de hosts específico. Los ejemplos serían hosts que ejecutan servicios específicos, solo aquellos que pertenecen a un entorno de producción, o instancias que se ejecutan en un Proveedor Cloud determinado.

Para hacerlo más práctico, usemos los datos de Metricbeat como ejemplo de cómo puedes usar las búsquedas de Elasticsearch para tamizar información de series temporales en particular a partir de tus datos.

Cada documento de Metricbeat contiene la información siguiente:

  • Una marca de tiempo
  • Datos temporales reales
    • Para Metricbeat, el formato de esta parte del documento se describe en la documentación de Metricbeat. Para obtener más información sobre el conjunto de métricas cpu del módulo system, consulta la documentación sobre conjuntos de métricas.
  • Metadatos sobre las propias métricas que se encuentran en un documento. Metricbeat usa los campos de ECS event.module y event.dataset para especificar cuál módulo de Metricbeat creó el documento y cuál conjunto de métricas se encuentra en el documento.
    • Esto puede ser útil para conocer las métricas presentes desde el principio, antes de que intentes recuperar datos temporales.
  • Metadatos sobre la instancia, si es un host físico, una máquina virtual o una entidad más pequeña como un pod de Kubernetes o un contenedor docker.
    • Estos metadatos se ajustan aElastic Common Schema para poder combinarse con datos de otras fuentes que también usen ECS.

A modo de ejemplo, así es como se ve un documento de Metricbeat del conjunto de métricas system.cpu. Los comentarios en línea en el objeto _source indican dónde puedes encontrar más información sobre el campo:

Nota: Los documentos incluyen comentarios introducidos por # en el JSON para fines de legibilidad.

{ 
  "_index" : "metricbeat-8.0.0-2019.08.12-000001", 
  "_type" : "_doc", 
  "_id" : "vWm5hWwB6vlM0zxdF3Q5", 
  "_score" : 0.0, 
  "_source" : { 
    "@timestamp": "2019-08-12T12:06:34.572Z", 
    "ecs" : { # ECS metadata 
      "version" : "1.0.1" 
    }, 
    "host" : { # ECS metadata 
      "name" : "noether", 
      "hostname" : "noether", 
      "architecture" : "x86_64", 
      "os" : { 
        "kernel" : "4.15.0-55-generic", 
        "codename" : "bionic", 
        "platform" : "ubuntu", 
        "version" : "18.04.3 LTS (Bionic Beaver)", 
        "family" : "debian", 
        ![name: "Ubuntu" 
      }, 
      id: "4e3eb308e7f24789b4ee0b6b873e5414", 
      "containerized" : false 
    }, 
    "agent" : { # ECS metadata 
      "ephemeral_id" : "7c725f8a-ac03-4f2d-a40c-3695a3591699", 
      "hostname" : "noether", 
      "id" : "e8839acc-7f5e-40be-a3ab-1cc891bcb3ce", 
      "version" : "8.0.0", 
      "type" : "metricbeat" 
    }, 
    "event" : { # ECS metadata 
      "dataset" : "system.cpu", 
      "module" : "system", 
      "duration" : 725494 
    }, 
    "metricset" : { # metricbeat metadata 
      "name" : "cpu" 
    }, 
    "service" : { # metricbeat metadata 
      "type" : "system" 
    }, 
    "system" : { # metricbeat time series data  
      "cpu" : { 
        "softirq" : { 
          "pct" : 0.0112, 
        }, 
        "steal" : { 
          "pct" : 0 
        }, 
        "cores" : 8, 
        "irq" : { 
          "pct" : 0 
        }, 
        "idle" : { 
          "pct" : 6.9141 
        }, 
        "nice" : { 
          "pct" : 0 
        }, 
        "user" : { 
          "pct" : 0.7672, 
        }, 
        "system" : { 
          "pct" : 0.3024, 
        }, 
        "iowait" : { 
          "pct" : 0.0051, 
        }, 
        "total" : 
          "pct" : 1.0808 
        } 
      } 
    } 
  } 
}

En resumen, en un documento de Metricbeat, los datos temporales y los metadatos están combinados, y necesitas conocimientos específicos sobre el formato del documento para recuperar exactamente lo que necesitas.

Por el contrario, si deseas procesar, analizar o visualizar datos temporales, los datos habitualmente deberían estar en un formato tipo tabla como el siguiente:

<series name> <timestamp> <value> <key-value pairs> 
system.cpu.user.pct 1565610800000 0.843 host.name=”noether” 
system.cpu.user.pct 1565610800000 0.951 host.name=”hilbert” 
system.cpu.user.pct 1565610810000 0.865 host.name=”noether” 
system.cpu.user.pct 1565610810000 0.793 host.name=”hilbert” 
system.cpu.user.pct 1565610820000 0.802 host.name=”noether” 
system.cpu.user.pct 1565610820000 0.679 host.name=”hilbert”

Las búsquedas de Elasticsearch pueden ayudarte a recuperar de forma programática datos temporales en un formato muy similar a esta tabla, y en los ejemplos siguientes se muestra cómo hacerlo. Si deseas experimentar con las búsquedas tú mismo, necesitarás una instancia de Elasticsearch y una instalación de Metricbeat en funcionamiento que esté enviando datos para los conjuntos de métricas system.cpu y system.network. Para una breve introducción a Metricbeat, echa un vistazo a la documentación sobre primeros pasos.

Puedes ejecutar todas las búsquedas desde la consola Dev Tools de Kibana. Si no la usaste antes, puedes encontrar una breve introducción en los documentos sobre la consola de Kibana. Ten en cuenta que necesitarás cambiar el nombre de host en las búsquedas de ejemplo.

Suponemos que Metricbeat está configurado conforme a los ajustes predeterminados. Esto significa que creará un índice por día, y que estos índices tendrán el nombre ‘metricbeat-VERSIÓN-FECHA-CONTADOR’, por ejemplo, algo como metricbeat-7.3.0-2019.08.06-000009. Para buscar en todos estos índices a la vez, usaremos un comodín:

Ejemplo de búsqueda:

GET metricbeat-*/_search

Y el ejemplo de respuesta siguiente:

{ 
  "took" : 2, 
  "timed_out" : false, 
  "_shards" : { 
    "total" : 1, 
    "successful" : 1, 
    "skipped" : 0, 
    "failed" : 0 
  }, 
  "hits" : { 
    "total" : { 
      "value" : 10000, 
      "relation" : "gte" 
    }, 
    "max_score" : 1.0, 
    "hits" : [...] 
  } 
}

Naturalmente, esta búsqueda supera el límite de documentos de Elasticsearch que se devolverá en una búsqueda. Las coincidencias reales se omitieron aquí, pero quizás quieras desplazarte por los resultados y compararlos con el documento con anotaciones anterior.

Según el tamaño de tu infraestructura monitoreada, puede haber una gran cantidad de documentos de Metricbeat, pero es poco frecuente que necesites una serie temporal desde el inicio del tiempo (registrado); así que, comencemos con un rango de datos, en este caso, los últimos cinco minutos:

Ejemplo de búsqueda:

GET metricbeat-*/_search 
{ 
  "query": { 
    "range": { 
      "@timestamp": { 
        "gte": "now-5m" 
      } 
    } 
  } 
}

Y el ejemplo de respuesta siguiente:

{ 
  "took" : 4, 
  "timed_out" : false, 
  "_shards" : { 
    "total" : 1, 
    "successful" : 1, 
    "skipped" : 0, 
    "failed" : 0 
  }, 
  "hits" : { 
    "total" : { 
      "value" : 30, 
      "relation" : "eq" 
    }, 
    "max_score" : 0.0, 
    "hits" : [...] 
  } 
}

Este tamaño puede controlarse mucho más. Sin embargo, el sistema en el que se ejecuta esta búsqueda solo tiene un host que le reporta, por lo que en un entorno de producción la cantidad de coincidencias seguirá siendo muy alta.

Para recuperar todos los datos de CPU de un host específico, un primer intento básico de búsqueda de Elasticsearch puede ser agregar filtros para host.name y el conjunto de métricas system.cpu:

Ejemplo de búsqueda:

GET metricbeat-*/_search 
{ 
  "query": { 
    "bool": { 
      "filter": [ 
        { 
          "range": { 
            "@timestamp": { 
              "gte": "now-5m" 
            } 
          } 
        }, 
        { 
          "bool": { 
            "should": [ 
              { 
                "match_phrase": { 
                  "host.name": "noether" 
                } 
              }, 
              { 
                "match_phrase": { 
                  "event.dataset": "system.cpu" 
                } 
              } 
            ] 
          } 
        } 
      ] 
    } 
  } 
}

Y el ejemplo de respuesta siguiente:

{ 
  "took" : 8, 
  "timed_out" : false, 
  "_shards" : { 
    "total" : 1, 
    "successful" : 1, 
    "skipped" : 0, 
    "failed" : 0 
  }, 
  "hits" : { 
    "total" : { 
      "value" : 30, 
      "relation" : "eq" 
    }, 
    "max_score" : 0.0, 
    "hits" : [...] 
  } 
}

Esta búsqueda aún devuelve una gran cantidad de documentos, y todos contienen los datos completos enviados por Metricbeat para el conjunto de métricas system.cpu. El resultado no es muy útil por varias razones.

En primer lugar, tendríamos que recuperar todos los documentos del rango de tiempo completo. Tan pronto alcancemos el límite configurado, Elasticsearch no los devolverá en un intento; tratará de clasificarlos, lo cual no tiene sentido en nuestra búsqueda, y no devolverá los documentos ordenados por marca de tiempo.

En segundo lugar, solo nos interesa una pequeña parte de cada documento: la marca de tiempo, algunos valores de métricas y probablemente algunos otros campos de metadatos. Devolver _source completo de Elasticsearch y luego elegir los datos de los resultados de la búsqueda es muy poco eficiente.

Un enfoque para resolver esto es usar las agregaciones de Elasticsearch.

Ejemplo 1: Porcentaje de CPU, muestra disminuida

Para comenzar, veamos histogramas de fecha. Una agregación de histogramas de fecha devolverá un valor por intervalo de tiempo. Las cubetas devueltas ya están ordenadas por tiempo, y el intervalo, o tamaño de cubeta, puede especificarse para que coincida con los datos. En este ejemplo, se eligieron 10 segundos como tamaño del intervalo porque Metricbeat envía datos del módulo del sistema cada 10 segundos de manera predeterminada. El parámetro size: 0 de nivel superior especifica que ya no nos interesan las coincidencias en sí, sino solo en la agregación, por lo que no se devolverán documentos.

Ejemplo de búsqueda:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "fixed_interval": "10s" 
      } 
    } 
  } 
}

Y el ejemplo de respuesta siguiente:

{ 
  ..., 
  "hits" : { 
    "total" : { 
      "value" : 30, 
      "relation" : "eq" 
    }, 
    "max_score" : null, 
    "hits" : [ ] 
  }, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:03:20.000Z", 
          "key" : 1565615000000, 
          "doc_count" : 1 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:03:30.000Z", 
          "key" : 1565615010000, 
          "doc_count" : 1 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:03:40.000Z", 
          "key" : 1565615020000, 
          "doc_count" : 1 
        }, 
        ... 
      ] 
    } 
  } 
}

Para cada cubeta, esto devuelve la marca de tiempo en key, un key_as_string útil que contiene un texto de fecha y hora legible para las personas, y la cantidad de documentos que acabaron en la cubeta.

El valor de doc_count es 1, porque el tamaño de la cubeta coincide con el periodo de reportes de Metricbeat. No es muy útil de lo contrario, por lo que añadiremos otra agregación para ver los valores de métricas reales. En este paso, debemos decidir cuál será esta agregación (para valores numéricos, avg, min y max son buenos candidatos), pero mientras tengamos solo un documento por cubeta, no es realmente importante cuál elijamos. En el ejemplo siguiente se ilustra esto devolviendo las agregaciones avg, min y max de los valores de la métrica system.cpu.user.pct en cubetas de 10 segundos de duración:

Ejemplo de búsqueda:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "fixed_interval": "10s" 
      }, 
      "aggregations": { 
        "myActualCpuUserMax": { 
          "max": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myActualCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myActualCpuUserMin": { 
          "min": { 
            "field": "system.cpu.user.pct" 
          } 
        } 
      } 
    } 
  } 
}

Y el ejemplo de respuesta siguiente:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:12:40.000Z", 
          "key" : 1565615560000, 
          "doc_count" : 1, 
          "myActualCpuUserMin" : { 
            "value" : 1.002 
          }, 
          "myActualCpuUserAvg" : { 
            "value" : 1.002 
          }, 
          "myActualCpuUserMax" : { 
            "value" : 1.002 
          } 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:12:50.000Z", 
          "key" : 1565615570000, 
          "doc_count" : 1, 
          "myActualCpuUserMin" : { 
            "value" : 0.866 
          }, 
          "myActualCpuUserAvg" : { 
            "value" : 0.866 
          }, 
          "myActualCpuUserMax" : { 
            "value" : 0.866 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Como puedes ver, los valores de myActualCpuUserMin, myActualCpuUserAvg y myActualCpuUserMax son los mismos en cada cubeta, por lo cual, si necesitas recuperar los valores sin procesar de una serie temporal que se reportó en intervalos regulares, puedes usar un histograma de fecha para lograrlo.

Sin embargo, la mayor parte del tiempo, no te interesará cada uno de los puntos de datos, en especial cuando las mediciones se realizan cada pocos segundos. Para muchos fines es mejor, en realidad, tener datos con granularidad más gruesa: por ejemplo, una visualización solo tiene una cantidad limitada de pixeles para mostrar variaciones en una serie temporal, por lo que los datos con mayor granularidad se descartarán al momento de la representación.

Suele disminuirse la muestra de los datos temporales a una granularidad que coincida con los requisitos del paso de procesamiento que siga. Durante la disminución de la muestra, varios puntos de datos dentro de un periodo dado se reducen a uno solo. En nuestro ejemplo de monitoreo del servidor, los datos pueden medirse cada 10 segundos, pero para la mayoría de los fines, el promedio de todos los valores en un minuto sería suficiente. A propósito, la disminución de la muestra es exactamente lo que hace una agregación de histograma de fecha cuando encuentra más de un documento por cubeta y se usan las agregaciones anidadas correctas.

En el ejemplo siguiente se muestra el resultado de un histograma de fecha con agregaciones avg, min y max anidadas durante una cubeta de 1 minuto completo, lo que brinda un primer ejemplo de disminución de la muestra. El uso de calendar_interval en lugar de fixed_interval alinea los límites de la cubeta a minutos completos.

Ejemplo de búsqueda:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "calendar_interval": "1m" 
      }, 
      "aggregations": { 
        "myDownsampledCpuUserMax": { 
          "max": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myDownsampledCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myDownsampledCpuUserMin": { 
          "min": { 
            "field": "system.cpu.user.pct" 
          } 
        } 
      } 
    } 
  } 
}

Y el ejemplo de respuesta siguiente:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:27:00.000Z", 
          "key" : 1565616420000, 
          "doc_count" : 4, 
          "myDownsampledCpuUserMax" : { 
            "value" : 0.927 
          }, 
          "myDownsampledCpuUserMin" : { 
            "value" : 0.6980000000000001 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.8512500000000001 
          } 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:28:00.000Z", 
          "key" : 1565616480000, 
          "doc_count" : 6, 
          "myDownsampledCpuUserMax" : { 
            "value" : 0.838 
          }, 
          "myDownsampledCpuUserMin" : { 
            "value" : 0.5670000000000001 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.7040000000000001 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Como puedes ver, los valores de myActualCpuUserMin, myActualCpuUserAvg y myActualCpuUserMax ahora son diferentes y dependen de la agregación usada.

El método que se usa para la disminución de la muestra depende de la métrica. Para el porcentaje de CPU, la agregación avg durante un minuto es adecuada; para métricas como longitudes de cola o carga de sistema, una agregación max probablemente sea más apropiada.

En este punto, también es posible usar Elasticsearch para realizar algunas operaciones matemáticas básicas y calcular una serie temporal que no se encuentre entre los datos originales. Suponiendo que trabajamos con la agregación avg para CPU, nuestro ejemplo podría mejorarse para devolver el CPU del usuario, el CPU del sistema y la suma de usuario + sistema dividida por CPU cores, de esta manera:

Ejemplo de búsqueda:

GET metricbeat-*/_search 
{ 
  "query": {...},   # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "calendar_interval": "1m" 
      }, 
      "aggregations": { 
        "myDownsampledCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myDownsampledCpuSystemAvg": { 
          "avg": { 
            "field": "system.cpu.system.pct" 
          } 
        }, 
        "myCpuCoresMax": { 
          "max": { 
            "field": "system.cpu.cores" 
          } 
        }, 
        "myCalculatedCpu": { 
          "bucket_script": { 
            "buckets_path": { 
              "user": "myDownsampledCpuUserAvg", 
              "system": "myDownsampledCpuSystemAvg", 
              "cores": "myCpuCoresMax" 
            }, 
            "script": { 
              "source": "(params.user + params.system) / params.cores", 
              "lang": "painless" 
            } 
          } 
        } 
      } 
    } 
  } 
}

Y el ejemplo de respuesta siguiente:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:32:00.000Z", 
          "key" : 1565616720000, 
          "doc_count" : 2, 
          "myDownsampledCpuSystemAvg" : { 
            "value" : 0.344 
          }, 
          "myCpuCoresMax" : { 
            "value" : 8.0 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.8860000000000001 
          }, 
          "myCalculatedCpu" : { 
            "value" : 0.15375 
          } 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:33:00.000Z", 
          "key" : 1565616780000, 
          "doc_count" : 6, 
          "myDownsampledCpuSystemAvg" : { 
            "value" : 0.33416666666666667 
          }, 
          "myCpuCoresMax" : { 
            "value" : 8.0 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.8895 
          }, 
          "myCalculatedCpu" : { 
            "value" : 0.15295833333333334 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Ejemplo 2: Tráfico de red: Términos y agregaciones derivadas

Un ejemplo ligeramente más elaborado de lo útil que pueden ser las agregaciones de Elasticsearch para los datos temporales es el conjunto de métricas system.network. La parte relevante de un documento de conjunto de métricas system.network se ve así:

{ 
... 
"system": { 
        "network": { 
            "in": { 
                "bytes": 37904869172, 
                "dropped": 32, 
                "errors": 0, 
                "packets": 32143403 
            }, 
            "name": "wlp4s0", 
            "out": { 
                "bytes": 6299331926, 
                "dropped": 0, 
                "errors": 0, 
                "packets": 13362703 
            } 
        } 
    } 
... 
}

Metricbeat enviará un documento por cada interfaz de red presente en el sistema. Estos documentos tendrán la misma marca de tiempo, pero diferentes valores en el campo system.network.name, uno por interfaz de red.

Cualquier otra agregación que se realice debe hacerse por interfaz, por lo que cambiamos la agregación de histograma de fecha de nivel superior de los ejemplos anteriores a una agregación de términos en el campo system.network.name.

Ten en cuenta que para que esto funcione, el campo en el que se agrega debe estar mapeado como campo de palabra clave. Si trabajas con la plantilla de índice predeterminada que proporciona Metricbeat, este mapeo ya se realizó por ti. De lo contrario, en la página de documentos sobre plantillas de Metricbeat encontrarás una descripción breve de lo que necesitas hacer.

Ejemplo de búsqueda:

GET metricbeat-*/_search 
{ 
  "query": {...}, # same as above 
  "size": 0, 
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name", 
        "size": 50 
      }, 
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp", 
            "calendar_interval": "1m" 
          } 
        } 
      } 
    } 
  } 
}

Y el ejemplo de respuesta siguiente:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" : 0, 
      "sum_other_doc_count" : 0, 
      "buckets" : [ 
        { 
          "key" : "docker0", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        }, 
        { 
          "key" : "enp0s31f6", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        }, 
        { 
          "key" : "lo", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        }, 
        { 
          "key" : "wlp61s0", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" : "2019-08-12T13:39:00.000Z", 
                "key" : 1565617140000, 
                "doc_count" : 1 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:40:00.000Z", 
                "key" : 1565617200000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:41:00.000Z", 
                "key" : 1565617260000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:42:00.000Z", 
                "key" : 1565617320000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:43:00.000Z", 
                "key" : 1565617380000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:44:00.000Z", 
                "key" : 1565617440000, 
                "doc_count" : 4 
              } 
            ] 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Como en el ejemplo de CPU, sin una agregación anidada, la agregación de histograma de fecha solo devuelve doc_count, que no resulta muy útil.

Los campos de bytes contienen valores incrementales de forma monótona. El valor de estos campos contiene la cantidad de bytes enviados o recibidos desde la última vez que se inició la máquina, por lo que aumenta con cada medición. En este caso, la agregación anidada correcta es max, de modo que el valor de muestra disminuida contendrá la medición más alta, y por lo tanto más reciente, tomada durante el intervalo de la cubeta.

Ejemplo de búsqueda:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name", 
        "size": 50 
      }, 
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp", 
            "calendar_interval": "1m" 
          }, 
          "aggregations": { 
            "myNetworkInBytesMax": { 
              "max": { 
                "field": "system.network.in.bytes" 
              } 
            }, 
            "myNetworkOutBytesMax": { 
              "max": { 
                "field": "system.network.out.bytes" 
              } 
            } 
          } 
        } 
      } 
    } 
  } 
}

Y el ejemplo de respuesta siguiente:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" : 0, 
      "sum_other_doc_count" : 0, 
      "buckets" : [ 
        { 
          "key" : "docker0", 
          ... 
        }, 
        { 
          "key" : "enp0s31f6", 
          ... 
        }, 
        { 
          "key" : "lo", 
          ... 
        }, 
        { 
          "key" : "wlp61s0", 
          "doc_count" : 30, 
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" : "2019-08-12T13:50:00.000Z", 
                "key" : 1565617800000, 
                "doc_count" : 2, 
                "myNetworkInBytesMax" : { 
                  "value" : 2.991659837E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.46578365E8 
                } 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:51:00.000Z", 
                "key" : 1565617860000, 
                "doc_count" : 6, 
                "myNetworkInBytesMax" : { 
                  "value" : 2.992027006E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.46791988E8 
                }, 
                "myNetworkInBytesPerSecond" : { 
                  "value" : 367169.0, 
                  "normalized_value" : 6119.483333333334 
                }, 
                "myNetworkoutBytesPerSecond" : { 
                  "value" : 213623.0, 
                  "normalized_value" : 3560.383333333333 
                } 
              }, 
              ... 
            ] 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Para obtener una tasa de bytes por segundo del contador que aumenta de forma monótona, se puede usar una agregación derivada. Cuando esta agregación se pasa al parámetro opcional unit, devuelve el valor deseado por unidad en el campo normalized_value:

Ejemplo de búsqueda:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name", 
        "size": 50 
      }, 
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp", 
            "calendar_interval": "1m" 
          }, 
          "aggregations": { 
            "myNetworkInBytesMax": { 
              "max": { 
                "field": "system.network.in.bytes" 
              } 
            }, 
            "myNetworkInBytesPerSecond": { 
              "derivative": { 
                "buckets_path": "myNetworkInBytesMax", 
                "unit": "1s" 
              } 
            }, 
            "myNetworkOutBytesMax": { 
              "max": { 
                "field": "system.network.out.bytes" 
              } 
            }, 
            "myNetworkoutBytesPerSecond": { 
              "derivative": { 
                "buckets_path": "myNetworkOutBytesMax", 
                "unit": "1s" 
              } 
            } 
          } 
        } 
      } 
    } 
  } 
}

Y el ejemplo de respuesta siguiente:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" : 0, 
      "sum_other_doc_count" : 0, 
      "buckets" : [ 
        { 
          "key" : "docker0", 
          ... 
        }, 
        { 
          "key" : "enp0s31f6", 
          ... 
        }, 
        { 
          "key" : "lo", 
          ... 
        }, 
        { 
          "key" : "wlp61s0", 
          "doc_count" : 30, 
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" : "2019-08-12T14:07:00.000Z", 
                "key" : 1565618820000, 
                "doc_count" : 4, 
                "myNetworkInBytesMax" : { 
                  "value" : 3.030494669E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.56084749E8 
                } 
              }, 
              { 
                "key_as_string" : "2019-08-12T14:08:00.000Z", 
                "key" : 1565618880000, 
                "doc_count" : 6, 
                "myNetworkInBytesMax" : { 
                  "value" : 3.033793744E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.56323416E8 
                }, 
                "myNetworkInBytesPerSecond" : { 
                  "value" : 3299075.0, 
                  "normalized_value" : 54984.583333333336 
                }, 
                "myNetworkoutBytesPerSecond" : { 
                  "value" : 238667.0, 
                  "normalized_value" : 3977.7833333333333 
                } 
              }, 
              { 
                "key_as_string" : "2019-08-12T14:09:00.000Z", 
                "key" : 1565618940000, 
                "doc_count" : 6, 
                "myNetworkInBytesMax" : { 
                  "value" : 3.037045046E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.56566282E8 
                }, 
                "myNetworkInBytesPerSecond" : { 
                  "value" : 3251302.0, 
                  "normalized_value" : 54188.36666666667 
                }, 
                "myNetworkoutBytesPerSecond" : { 
                  "value" : 242866.0, 
                  "normalized_value" : 4047.766666666667 
                } 
              }, 
              ...   
            ] 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Puedes intentar todo esto en tu propio cluster o, si aún no tienes uno, puedes activar una prueba gratuita de Elasticsearch Service en Elastic Cloud o descargar la distribución predeterminada del Elastic Stack. Comienza a enviar datos de tus sistemas con Metricbeat y realiza búsquedas.