Consulta e agregação de dados de série temporal no Elasticsearch

Historicamente, o Elasticsearch é um mecanismo de busca, que mantém seus índices de busca em um banco de dados Lucene. A partir desse ponto inicial, no entanto, o próprio Elasticsearch evoluiu e se tornou um datastore escalável e de alto desempenho, com possibilidade de ser usado em cluster. Seu formato de índice ainda reflete o histórico de seus primórdios, mas é usado por todos os tipos de usuários para todos os tipos de finalidades.

Uma dessas finalidades é a capacidade de armazenar, processar e recuperar dados de série temporal. Os dados de série temporal são caracterizados pelo fato de cada ponto de dados estar associado a um registro de data e hora preciso. Na maioria das vezes, um ponto de dados representa algum tipo de medição realizada em um momento específico. Pode ser uma cotação de ações, uma observação científica ou a carga de um servidor.

Embora existam várias implementações de banco de dados especializadas no tratamento de dados de série temporal, não há um formato comum no qual as séries temporais sejam armazenadas e consultadas e teoricamente qualquer mecanismo de banco de dados pode ser usado para lidar com dados de série temporal.

Uma entrada de banco de dados de série temporal, em forma abstrata, consiste nos seguintes elementos:

  • o nome da série
  • um registro de data e hora
  • um valor
  • um conjunto de pares de chave-valor ou tags, contendo mais informações sobre a série

Em um caso de uso de monitoramento de servidor, sempre haverá um par de chave-valor para especificar a qual host uma série temporal pertence, mas será possível incluir qualquer tipo de informação adicional, que poderá ser usada posteriormente apenas para solicitar métricas sobre um determinado conjunto de hosts. Exemplos disso seriam os hosts que executam serviços específicos, apenas aqueles que pertencem a um ambiente de produção ou instâncias executadas em um determinado provedor de serviços em nuvem.

Para tornar isso mais prático, vamos usar os dados do Metricbeat como um exemplo de como você pode usar as consultas do Elasticsearch para filtrar determinadas informações de série temporal dos seus dados.

Cada documento do Metricbeat contém as seguintes informações:

  • Um registro de data e hora
  • Dados reais de série temporal
    • Para o Metricbeat, o formato dessa parte do documento é descrito na documentação do Metricbeat. Saiba mais sobre o conjunto de métricas cpu do módulo system, consultando a documentação do conjunto de métricas.
  • Metadados sobre as próprias métricas contidas em um documento. O Metricbeat usa os campos event.module e event.dataset do ECS para especificar qual módulo do Metricbeat criou o documento e qual conjunto de métricas está contido no documento.
    • Isso pode ser útil primeiramente para descobrir quais métricas estão presentes antes de você tentar recuperar dados de série temporal.
  • Metadados sobre a instância, seja um host físico, uma máquina virtual ou uma entidade menor, como um pod do Kubernetes ou um container do Docker
    • Esses metadados seguem o Elastic Common Schemapara que seja possível fazer a correspondência deles com dados de outras fontes que também usem o ECS.

Como exemplo, veja como é um documento do Metricbeat do conjunto de métricas system.cpu. Os comentários em linha no objeto _source indicam onde você pode encontrar mais informações sobre o campo:

Observação: os documentos incluem comentários com # no JSON para facilitar a leitura.

{ 
  "_index" : "metricbeat-8.0.0-2019.08.12-000001", 
  "_type" : "_doc", 
  "_id" : "vWm5hWwB6vlM0zxdF3Q5", 
  "_score" : 0.0, 
  "_source" : { 
    "@timestamp" : "2019-08-12T12:06:34.572Z", 
    "ecs" : { # ECS metadata 
      "version" : "1.0.1" 
    }, 
    "host" : { # ECS metadata 
      "name" : "noether", 
      "hostname" : "noether", 
      "architecture" : "x86_64", 
      "os" : { 
        "kernel" : "4.15.0-55-generic", 
        "codename" : "bionic", 
        "platform" : "ubuntu", 
        "version" : "18.04.3 LTS (Bionic Beaver)", 
        "family" : "debian", 
        "name" : "Ubuntu" 
      }, 
      "id" : "4e3eb308e7f24789b4ee0b6b873e5414", 
      "containerized" : false 
    }, 
    "agent" : { # ECS metadata 
      "ephemeral_id" : "7c725f8a-ac03-4f2d-a40c-3695a3591699", 
      "hostname" : "noether", 
      "id" : "e8839acc-7f5e-40be-a3ab-1cc891bcb3ce", 
      "version" : "8.0.0", 
      "type" : "metricbeat" 
    }, 
    "event" : { # ECS metadata 
      "dataset" : "system.cpu", 
      "module" : "system", 
      "duration" : 725494 
    }, 
    "metricset" : { # metricbeat metadata 
      "name" : "cpu" 
    }, 
    "service" : { # metricbeat metadata 
      "type" : "system" 
    }, 
    "system" : { # metricbeat time series data  
      "cpu" : { 
        "softirq" : { 
          "pct" : 0.0112 
        }, 
        "steal" : { 
          "pct" : 0 
        }, 
        "cores" : 8, 
        "irq" : { 
          "pct" : 0 
        }, 
        "idle" : { 
          "pct" : 6.9141 
        }, 
        "nice" : { 
          "pct" : 0 
        }, 
        "user" : { 
          "pct" : 0.7672 
        }, 
        "system" : { 
          "pct" : 0.3024 
        }, 
        "iowait" : { 
          "pct" : 0.0051 
        }, 
        "total" : { 
          "pct" : 1.0808 
        } 
      } 
    } 
  } 
}

Para resumir, em um documento do Metricbeat, os dados de série temporal e os metadados estão misturados, e você precisa de conhecimento específico sobre o formato do documento para recuperar exatamente aquilo de que precisa.

Por outro lado, se você quer processar, analisar ou visualizar dados de série temporal, os dados normalmente devem estar em um formato de tabela como este:

<series name> <timestamp> <value> <key-value pairs> 
system.cpu.user.pct 1565610800000 0.843 host.name=”noether” 
system.cpu.user.pct 1565610800000 0.951 host.name=”hilbert” 
system.cpu.user.pct 1565610810000 0.865 host.name=”noether” 
system.cpu.user.pct 1565610810000 0.793 host.name=”hilbert” 
system.cpu.user.pct 1565610820000 0.802 host.name=”noether” 
system.cpu.user.pct 1565610820000 0.679 host.name=”hilbert”

As consultas do Elasticsearch podem ajudar você a recuperar programaticamente dados de série temporal em um formato muito próximo ao de uma tabela como essa, e os exemplos a seguir mostram como fazer isso. Se quiser experimentar fazer consultas, você precisará de uma instância do Elasticsearch e de uma instalação do Metricbeat em execução que esteja enviando dados para os conjuntos de métricas system.cpu e system.network. Para saber como começar a trabalhar com o Metricbeat, consulte a documentação de introdução.

Você pode executar todas as consultas no console Dev Tools do Kibana. Se você nunca o usou antes, pode encontrar uma breve introdução nos documentos sobre o console do Kibana. Observe que você precisará alterar o nome do host nas consultas de exemplo.

Vamos supor que o Metricbeat esteja configurado seguindo a configuração padrão. Isso significa que ele criará um índice por dia, e esses índices serão nomeados no padrão ‘metricbeat-VERSION-DATE-COUNTER’, por exemplo, algo como metricbeat-7.3.0-2019.08.06-000009. Para consultar todos esses índices de uma só vez, usaremos um curinga:

Consulta de amostra:

GET metricbeat-*/_search

E a seguinte resposta de exemplo:

{ 
  "took" : 2, 
  "timed_out" : false, 
  "_shards" : { 
    "total" : 1, 
    "successful" : 1, 
    "skipped" : 0, 
    "failed" : 0 
  }, 
  "hits" : { 
    "total" : { 
      "value" : 10000, 
      "relation" : "gte" 
    }, 
    "max_score" : 1.0, 
    "hits" : [...] 
  } 
}

Naturalmente, essa consulta excede o limite de documentos do Elasticsearch que serão retornados em uma única consulta. As ocorrências reais foram omitidas aqui, mas convém rolar a tela para ver seus resultados e compará-los com o documento anotado acima.

Dependendo do tamanho da sua infraestrutura monitorada, pode haver um grande número de documentos do Metricbeat, mas é raro que você precise de uma série temporal desde o início da hora (gravada); portanto, vamos começar com um período, neste caso, os últimos cinco minutos:

Consulta de amostra:

GET metricbeat-*/_search 
{ 
  "query": { 
    "range": { 
      "@timestamp": { 
        "gte": "now-5m" 
      } 
    } 
  } 
}

E a seguinte resposta de exemplo:

{ 
  "took" : 4, 
  "timed_out" : false, 
  "_shards" : { 
    "total" : 1, 
    "successful" : 1, 
    "skipped" : 0, 
    "failed" : 0 
  }, 
  "hits" : { 
    "total" : { 
      "value" : 30, 
      "relation" : "eq" 
    }, 
    "max_score" : 0.0, 
    "hits" : [...] 
  } 
}

Agora esse é um tamanho muito mais gerenciável. No entanto, o sistema no qual essa consulta é executada tem apenas um host gerando relatórios para ele; portanto, em um ambiente de produção, o número de ocorrências ainda será muito alto.

Para recuperar todos os dados da CPU de um host específico, uma primeira tentativa ingênua em uma consulta do Elasticsearch poderia ser adicionar filtros ao host.name e ao conjunto de métricas system.cpu:

Consulta de amostra:

GET metricbeat-*/_search 
{ 
  "query": { 
    "bool": { 
      "filter": [ 
        { 
          "range": { 
            "@timestamp": { 
              "gte": "now-5m" 
            } 
          } 
        }, 
        { 
          "bool": { 
            "should": [ 
              { 
                "match_phrase": { 
                  "host.name": "noether" 
                } 
              }, 
              { 
                "match_phrase": { 
                  "event.dataset": "system.cpu" 
                } 
              } 
            ] 
          } 
        } 
      ] 
    } 
  } 
}

E a seguinte resposta de exemplo:

{ 
  "took" : 8, 
  "timed_out" : false, 
  "_shards" : { 
    "total" : 1, 
    "successful" : 1, 
    "skipped" : 0, 
    "failed" : 0 
  }, 
  "hits" : { 
    "total" : { 
      "value" : 30, 
      "relation" : "eq" 
    }, 
    "max_score" : 0.0, 
    "hits" : [...] 
  } 
}

Essa consulta ainda retorna um grande número de documentos, todos contendo os dados completos enviados pelo Metricbeat para o conjunto de métricas system.cpu. Não é um resultado muito útil por algumas razões.

Primeiro, precisaríamos recuperar todos os documentos de todo o intervalo de tempo. Assim que atingirmos o limite configurado, o Elasticsearch não retornará esses valores de uma só vez; ele tentará classificá-los, o que não faz sentido com a nossa consulta; e ele não retornará os documentos classificados por registro de data e hora.

Segundo, estamos interessados apenas em uma pequena parte de cada documento: o registro de data e hora, alguns valores de métricas e possivelmente alguns campos de metadados adicionais. Retornar todo o _source do Elasticsearch e depois selecionar os dados do resultado da consulta é muito ineficiente.

Uma abordagem para resolver isso é usar as agregações do Elasticsearch.

Exemplo 1: porcentagem da CPU, amostragem reduzida

Para começar, vejamos os histogramas de data. Uma agregação de histograma de data retornará um único valor por intervalo de tempo. Os blocos retornados já estão ordenados por horário, e o intervalo ou tamanho do bloco pode ser especificado para corresponder aos dados. Neste exemplo, o tamanho do intervalo escolhido foi de 10 segundos porque o Metricbeat envia dados do módulo do sistema a cada 10 segundos por padrão. O parâmetro size: 0 de nível superior especifica que não estamos mais interessados nas ocorrências reais, mas apenas na agregação; portanto, nenhum documento será retornado.

Consulta de amostra:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "fixed_interval": "10s" 
      } 
    } 
  } 
}

E a seguinte resposta de exemplo:

{ 
  ..., 
  "hits" : { 
    "total" : { 
      "value" : 30, 
      "relation" : "eq" 
    }, 
    "max_score" : null, 
    "hits" : [ ] 
  }, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:03:20.000Z", 
          "key" : 1565615000000, 
          "doc_count" : 1 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:03:30.000Z", 
          "key" : 1565615010000, 
          "doc_count" : 1 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:03:40.000Z", 
          "key" : 1565615020000, 
          "doc_count" : 1 
        }, 
        ... 
      ] 
    } 
  } 
}

Para cada bloco, isso retorna o registro de data e hora na key, uma key_as_string útil que contém uma string de data e hora legível por humanos e o número de documentos que foram incluídos no bloco.

Esse doc_count é 1 porque o tamanho do bloco corresponde ao período de relatório do Metricbeat. Caso contrário, não será muito útil; portanto, para ver os valores reais das métricas, adicionaremos outra agregação. Nesta etapa, precisamos decidir qual será essa agregação — para valores numéricos, avg, min e max são bons candidatos —, mas contanto que tenhamos apenas um documento por bloco, na verdade, não fará diferença qual escolhermos. O exemplo a seguir ilustra isso retornando as agregações avg, min e max nos valores da métrica system.cpu.user.pct em blocos de 10 segundos:

Consulta de amostra:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "fixed_interval": "10s" 
      }, 
      "aggregations": { 
        "myActualCpuUserMax": { 
          "max": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myActualCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myActualCpuUserMin": { 
          "min": { 
            "field": "system.cpu.user.pct" 
          } 
        } 
      } 
    } 
  } 
}

E a seguinte resposta de exemplo:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:12:40.000Z", 
          "key" : 1565615560000, 
          "doc_count" : 1, 
          "myActualCpuUserMin" : { 
            "value" : 1.002 
          }, 
          "myActualCpuUserAvg" : { 
            "value" : 1.002 
          }, 
          "myActualCpuUserMax" : { 
            "value" : 1.002 
          } 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:12:50.000Z", 
          "key" : 1565615570000, 
          "doc_count" : 1, 
          "myActualCpuUserMin" : { 
            "value" : 0.866 
          }, 
          "myActualCpuUserAvg" : { 
            "value" : 0.866 
          }, 
          "myActualCpuUserMax" : { 
            "value" : 0.866 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Como você pode ver, os valores para myActualCpuUserMin, myActualCpuUserAvg e myActualCpuUserMax são os mesmos em cada bloco; portanto, se precisar recuperar os valores brutos de uma série temporal que foi relatada em intervalos regulares, você poderá usar um histograma de data para conseguir isso.

Na maioria das vezes, no entanto, você não terá interesse em cada um dos pontos de dados, principalmente quando forem feitas medições a cada poucos segundos. Para muitas finalidades, na verdade, é melhor ter dados com granularidade mais grossa: por exemplo, uma visualização tem apenas uma quantidade limitada de pixels para exibir variações em uma série temporal; portanto, dados com granularidade mais alta serão descartados no momento da renderização.

Os dados de série temporal geralmente são reduzidos a uma granularidade que corresponda aos requisitos da etapa do processamento que venha a seguir. Na redução da amostragem, vários pontos de dados dentro de um determinado período são reduzidos a um só. Em nosso exemplo de monitoramento de servidor, os dados podem ser medidos a cada 10 segundos, mas para a maioria das finalidades, a média de todos os valores em um minuto já seria ótima. Aliás, a redução da amostra é exatamente o que uma agregação de histograma de data faz quando encontra mais de um documento por bloco, e as agregações aninhadas corretas são usadas.

O exemplo a seguir mostra o resultado de um histograma de data com agregaçõesavg, min e max aninhadas em um bloco de 1 minuto completo, fornecendo um primeiro exemplo de redução de amostra. O uso de calendar_interval em vez de fixed_interval alinha os limites do bloco a minutos completos.

Consulta de amostra:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "calendar_interval": "1m" 
      }, 
      "aggregations": { 
        "myDownsampledCpuUserMax": { 
          "max": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myDownsampledCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myDownsampledCpuUserMin": { 
          "min": { 
            "field": "system.cpu.user.pct" 
          } 
        } 
      } 
    } 
  } 
}

E a seguinte resposta de exemplo:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:27:00.000Z", 
          "key" : 1565616420000, 
          "doc_count" : 4, 
          "myDownsampledCpuUserMax" : { 
            "value" : 0.927 
          }, 
          "myDownsampledCpuUserMin" : { 
            "value" : 0.6980000000000001 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.8512500000000001 
          } 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:28:00.000Z", 
          "key" : 1565616480000, 
          "doc_count" : 6, 
          "myDownsampledCpuUserMax" : { 
            "value" : 0.838 
          }, 
          "myDownsampledCpuUserMin" : { 
            "value" : 0.5670000000000001 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.7040000000000001 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Como você pode ver, os valores para myActualCpuUserMin, myActualCpuUserAvg e myActualCpuUserMax agora são diferentes e dependem da agregação usada.

O método que será usado para reduzir a amostragem depende da métrica. Para porcentagem da CPU, a agregação avg em um minuto é ótima; para métricas como comprimentos de fila ou carga do sistema, uma agregação max poderia ser mais apropriada.

Nesse ponto, também é possível usar o Elasticsearch para executar alguma aritmética básica e calcular uma série temporal que não esteja nos dados originais. Supondo que trabalhemos com a agregação avg para CPU, nosso exemplo poderia ser aprimorado para retornar a CPU do usuário, a CPU do sistema e a soma de usuário + sistema dividida pelos CPU cores, da seguinte forma:

Consulta de amostra:

GET metricbeat-*/_search 
{ 
  "query": {...},   # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "calendar_interval": "1m" 
      }, 
      "aggregations": { 
        "myDownsampledCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myDownsampledCpuSystemAvg": { 
          "avg": { 
            "field": "system.cpu.system.pct" 
          } 
        }, 
        "myCpuCoresMax": { 
          "max": { 
            "field": "system.cpu.cores" 
          } 
        }, 
        "myCalculatedCpu": { 
          "bucket_script": { 
            "buckets_path": { 
              "user": "myDownsampledCpuUserAvg", 
              "system": "myDownsampledCpuSystemAvg", 
              "cores": "myCpuCoresMax" 
            }, 
            "script": { 
              "source": "(params.user + params.system) / params.cores", 
              "lang": "painless" 
            } 
          } 
        } 
      } 
    } 
  } 
}

E a seguinte resposta de exemplo:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:32:00.000Z", 
          "key" : 1565616720000, 
          "doc_count" : 2, 
          "myDownsampledCpuSystemAvg" : { 
            "value" : 0.344 
          }, 
          "myCpuCoresMax" : { 
            "value" : 8.0 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.8860000000000001 
          }, 
          "myCalculatedCpu" : { 
            "value" : 0.15375 
          } 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:33:00.000Z", 
          "key" : 1565616780000, 
          "doc_count" : 6, 
          "myDownsampledCpuSystemAvg" : { 
            "value" : 0.33416666666666667 
          }, 
          "myCpuCoresMax" : { 
            "value" : 8.0 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.8895 
          }, 
          "myCalculatedCpu" : { 
            "value" : 0.15295833333333334 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Exemplo 2: tráfego de rede — termos e agregações derivadas

Um exemplo um pouco mais elaborado de como as agregações do Elasticsearch podem ser úteis para dados de série temporal é o conjunto de métricas system.network. A parte relevante de um documento de conjunto de métricas system.network fica assim:

{ 
... 
"system": { 
        "network": { 
            "in": { 
                "bytes": 37904869172, 
                "dropped": 32, 
                "errors": 0, 
                "packets": 32143403 
            }, 
            "name": "wlp4s0", 
            "out": { 
                "bytes": 6299331926, 
                "dropped": 0, 
                "errors": 0, 
                "packets": 13362703 
            } 
        } 
    } 
... 
}

O Metricbeat enviará um documento para cada interface de rede presente no sistema. Esses documentos terão o mesmo registro de data e hora, mas valores diferentes para o campo system.network.name, sendo um para cada interface de rede.

Qualquer agregação adicional precisará ser feita por interface; portanto, alteraremos a agregação do histograma de data de nível superior dos exemplos anteriores para uma agregação de termos no campo system.network.name.

Observe que, para que isso funcione, o campo agregado precisa ser mapeado como um campo de palavra-chave. Se você trabalha com o modelo de índice padrão fornecido pelo Metricbeat, esse mapeamento já foi configurado para você. Caso contrário, a página do documento de modelos do Metricbeat apresenta uma breve descrição do que você precisa fazer.

Consulta de amostra:

GET metricbeat-*/_search 
{ 
  "query": {...}, # same as above 
  "size": 0, 
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name", 
        "size": 50 
      }, 
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp", 
            "calendar_interval": "1m" 
          } 
        } 
      } 
    } 
  } 
}

E a seguinte resposta de exemplo:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" : 0, 
      "sum_other_doc_count" : 0, 
      "buckets" : [ 
        { 
          "key" : "docker0", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        }, 
        { 
          "key" : "enp0s31f6", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        }, 
        { 
          "key" : "lo", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        }, 
        { 
          "key" : "wlp61s0", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" : "2019-08-12T13:39:00.000Z", 
                "key" : 1565617140000, 
                "doc_count" : 1 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:40:00.000Z", 
                "key" : 1565617200000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:41:00.000Z", 
                "key" : 1565617260000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:42:00.000Z", 
                "key" : 1565617320000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:43:00.000Z", 
                "key" : 1565617380000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:44:00.000Z", 
                "key" : 1565617440000, 
                "doc_count" : 4 
              } 
            ] 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Como no exemplo da CPU, sem uma agregação aninhada, a agregação do histograma de data retorna apenas o doc_count, o que não é muito útil.

Os campos para bytes contêm valores crescentes monotonicamente. O valor desses campos contém o número de bytes enviados ou recebidos desde a última vez em que a máquina foi iniciada, aumentando, assim, a cada medição. Nesse caso, a agregação aninhada correta é max, para que o valor da amostra reduzida contenha a medição mais alta e, portanto, a mais recente realizada durante o intervalo do bloco.

Consulta de amostra:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name", 
        "size": 50 
      }, 
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp", 
            "calendar_interval": "1m" 
          }, 
          "aggregations": { 
            "myNetworkInBytesMax": { 
              "max": { 
                "field": "system.network.in.bytes" 
              } 
            }, 
            "myNetworkOutBytesMax": { 
              "max": { 
                "field": "system.network.out.bytes" 
              } 
            } 
          } 
        } 
      } 
    } 
  } 
}

E a seguinte resposta de exemplo:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" : 0, 
      "sum_other_doc_count" : 0, 
      "buckets" : [ 
        { 
          "key" : "docker0", 
          ... 
        }, 
        { 
          "key" : "enp0s31f6", 
          ... 
        }, 
        { 
          "key" : "lo", 
          ... 
        }, 
        { 
          "key" : "wlp61s0", 
          "doc_count" : 30, 
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" : "2019-08-12T13:50:00.000Z", 
                "key" : 1565617800000, 
                "doc_count" : 2, 
                "myNetworkInBytesMax" : { 
                  "value" : 2.991659837E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.46578365E8 
                } 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:51:00.000Z", 
                "key" : 1565617860000, 
                "doc_count" : 6, 
                "myNetworkInBytesMax" : { 
                  "value" : 2.992027006E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.46791988E8 
                }, 
                "myNetworkInBytesPerSecond" : { 
                  "value" : 367169.0, 
                  "normalized_value" : 6119.483333333334 
                }, 
                "myNetworkoutBytesPerSecond" : { 
                  "value" : 213623.0, 
                  "normalized_value" : 3560.383333333333 
                } 
              }, 
              ... 
            ] 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Para obter uma taxa de bytes por segundo do contador que aumenta monotonicamente, pode-se usar uma agregação derivada. Quando o parâmetro opcional unit é passado para essa agregação (consulta), ele retorna o valor desejado por unidade no campo normalized_value:

Consulta de amostra:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name", 
        "size": 50 
      }, 
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp", 
            "calendar_interval": "1m" 
          }, 
          "aggregations": { 
            "myNetworkInBytesMax": { 
              "max": { 
                "field": "system.network.in.bytes" 
              } 
            }, 
            "myNetworkInBytesPerSecond": { 
              "derivative": { 
                "buckets_path": "myNetworkInBytesMax", 
                "unit": "1s" 
              } 
            }, 
            "myNetworkOutBytesMax": { 
              "max": { 
                "field": "system.network.out.bytes" 
              } 
            }, 
            "myNetworkoutBytesPerSecond": { 
              "derivative": { 
                "buckets_path": "myNetworkOutBytesMax", 
                "unit": "1s" 
              } 
            } 
          } 
        } 
      } 
    } 
  } 
}

E a seguinte resposta de exemplo:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" : 0, 
      "sum_other_doc_count" : 0, 
      "buckets" : [ 
        { 
          "key" : "docker0", 
          ... 
        }, 
        { 
          "key" : "enp0s31f6", 
          ... 
        }, 
        { 
          "key" : "lo", 
          ... 
        }, 
        { 
          "key" : "wlp61s0", 
          "doc_count" : 30, 
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" : "2019-08-12T14:07:00.000Z", 
                "key" : 1565618820000, 
                "doc_count" : 4, 
                "myNetworkInBytesMax" : { 
                  "value" : 3.030494669E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.56084749E8 
                } 
              }, 
              { 
                "key_as_string" : "2019-08-12T14:08:00.000Z", 
                "key" : 1565618880000, 
                "doc_count" : 6, 
                "myNetworkInBytesMax" : { 
                  "value" : 3.033793744E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.56323416E8 
                }, 
                "myNetworkInBytesPerSecond" : { 
                  "value" : 3299075.0, 
                  "normalized_value" : 54984.583333333336 
                }, 
                "myNetworkoutBytesPerSecond" : { 
                  "value" : 238667.0, 
                  "normalized_value" : 3977.7833333333333 
                } 
              }, 
              { 
                "key_as_string" : "2019-08-12T14:09:00.000Z", 
                "key" : 1565618940000, 
                "doc_count" : 6, 
                "myNetworkInBytesMax" : { 
                  "value" : 3.037045046E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.56566282E8 
                }, 
                "myNetworkInBytesPerSecond" : { 
                  "value" : 3251302.0, 
                  "normalized_value" : 54188.36666666667 
                }, 
                "myNetworkoutBytesPerSecond" : { 
                  "value" : 242866.0, 
                  "normalized_value" : 4047.766666666667 
                } 
              }, 
              ...   
            ] 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Você pode tentar tudo isso no seu próprio cluster ou, se ainda não tiver um cluster, poderá fazer uma avaliação gratuita do Elasticsearch Service no Elastic Cloud ou baixar a distribuição padrão do Elastic Stack. Comece a enviar dados dos seus sistemas com o Metricbeat e fazer consultas!