Engineering

Abfragen und Aggregieren von Zeitreihendaten in Elasticsearch

Elasticsearch ist von seinen Ursprüngen her eine Suchmaschine, die ihre Suchindizes in einer Lucene-Datenbank aufbewahrt. Mittlerweile hat sich Elasticsearch jedoch zu einem schnellen, clusterfähigen und skalierbaren Datenspeicher entwickelt. In seinem Indexformat spiegeln sich weiterhin seine historischen Anfänge wider, es wird aber von den verschiedensten Nutzergruppen für die verschiedensten Zwecke eingesetzt.

Einer dieser Zwecke ist das Speichern, Verarbeiten und Abrufen von Zeitreihendaten. Zeitreihendaten zeichnen sich dadurch aus, dass jeder einzelne Datenpunkt mit einem präzisen Zeitstempel verknüpft ist. In den meisten Fällen steht ein Datenpunkt für einen Messwert zu einem konkreten Zeitpunkt – ob Aktienkurs, wissenschaftliche Beobachtung oder Lastzustand eines Servers.

Es gibt zwar eine Reihe von Datenbankimplementierungen, die auf den Umgang mit Zeitreihendaten spezialisiert sind, aber es fehlt ein allgemein verwendetes Format für das Speichern und Abfragen von Zeitreihen, und theoretisch kann jede Datenbank-Engine Zeitreihendaten verarbeiten.

Ein Zeitreihen-Datenbankeintrag besteht, in abstrakter Form, aus

  • dem Namen der Zeitreihe,
  • einem Zeitstempel,
  • einem Wert und
  • einem Satz von Schlüssel/Wert-Paaren oder Tags, die weitere Informationen zur Zeitreihe enthalten.

Wenn es um die Beobachtung eines Servers geht, gibt es stets ein Schlüssel/Wert-Paar, das angibt, zu welchem Host die Zeitreihe gehört. Die zusätzlichen Informationen, die hinzugefügt werden können, unterliegen aber keinerlei Einschränkungen; sie können später genutzt werden, um nur die Metriken zu einem konkreten Satz von Hosts anzufordern. Auf diese Weise lässt sich zum Beispiel nach Hosts filtern, auf denen spezifische Dienste ausgeführt werden, oder nach Hosts, die zu einer Produktionsumgebung gehören, oder aber nach Instanzen, die auf einem bestimmten Cloudanbieter laufen.

Um dies anschaulicher zu machen, zeigen wir im Folgenden anhand von Metricbeat-Daten, wie Sie Elasticsearch-Abfragen nutzen können, um bestimmte Zeitreiheninformationen aus Ihren Daten herauszufiltern.

Jedes Metricbeat-Dokument enthält die folgenden Informationen:

  • einen Zeitstempel
  • die eigentlichen Zeitreihendaten
    • Für Metricbeat wird das Format dieses Teils des Dokuments in der Metricbeat-Dokumentation beschrieben. Wenn Sie mehr über den Metricset cpu des Moduls system erfahren möchten, sehen Sie sich die Metricset-Dokumentation an.
  • Metadaten über die im Dokument enthaltenen Metriken selbst. Metricbeat nutzt die ECS-Felder event.module und event.dataset, um anzugeben, welches Metricbeat-Modul das Dokument erstellt hat und welcher Metricset im Dokument enthalten ist.
    • Dies kann hilfreich sein, um überhaupt erst einmal herauszufinden, welche Metriken vorhanden sind, bevor man versucht, Zeitreihendaten abzurufen.
  • Metadaten zur Instanz, also ob es sich um einen physischen Host, eine Virtual Machine oder eine kleinere Entität, wie einen Kubernetes-Pod oder einen Docker-Container, handelt
    • Diese Metadaten folgen dem Elastic Common Schema und können daher mit Daten aus anderen Quellen abgeglichen werden, die ebenfalls ECS-konform sind.

Als Beispiel zeigen wir im Folgenden, wie ein Metricbeat-Dokument aus dem Metricset system.cpu aussieht. Den Inline-Kommentaren zum Objekt _source lässt sich entnehmen, wo Sie weitere Informationen zum Feld finden:

Hinweis: Für die bessere Lesbarkeit enthalten Dokumente im JSON-Code #-Kommentare.

{ 
  "_index" : "metricbeat-8.0.0-2019.08.12-000001", 
  "_type" : "_doc", 
  "_id" : "vWm5hWwB6vlM0zxdF3Q5", 
  "_score" : 0.0, 
  "_source" : { 
    "@timestamp" : "2019-08-12T12:06:34.572Z", 
    "ecs" : { # ECS metadata 
      "version" : "1.0.1" 
    }, 
    "host" : { # ECS metadata 
      "name" : "noether", 
      "hostname" : "noether", 
      "architecture" : "x86_64", 
      "os" : { 
        "kernel" : "4.15.0-55-generic", 
        "codename" : "bionic", 
        "platform" : "ubuntu", 
        "version" : "18.04.3 LTS (Bionic Beaver)", 
        "family" : "debian", 
        "name" : "Ubuntu" 
      }, 
      "id" : "4e3eb308e7f24789b4ee0b6b873e5414", 
      "containerized" : false 
    }, 
    "agent" : { # ECS metadata 
      "ephemeral_id" : "7c725f8a-ac03-4f2d-a40c-3695a3591699", 
      "hostname" : "noether", 
      "id" : "e8839acc-7f5e-40be-a3ab-1cc891bcb3ce", 
      "version" : "8.0.0", 
      "type" : "metricbeat" 
    }, 
    "event" : { # ECS metadata 
      "dataset" : "system.cpu", 
      "module" : "system", 
      "duration" : 725494 
    }, 
    "metricset" : { # metricbeat metadata 
      "name" : "cpu" 
    }, 
    "service" : { # metricbeat metadata 
      "type" : "system" 
    }, 
    "system" : { # metricbeat time series data  
      "cpu" : { 
        "softirq" : { 
          "pct" : 0.0112 
        }, 
        "steal" : { 
          "pct" : 0 
        }, 
        "cores" : 8, 
        "irq" : { 
          "pct" : 0 
        }, 
        "idle" : { 
          "pct" : 6.9141 
        }, 
        "nice" : { 
          "pct" : 0 
        }, 
        "user" : { 
          "pct" : 0.7672 
        }, 
        "system" : { 
          "pct" : 0.3024 
        }, 
        "iowait" : { 
          "pct" : 0.0051 
        }, 
        "total" : { 
          "pct" : 1.0808 
        } 
      } 
    } 
  } 
}

Zusammenfassend lässt sich sagen, dass ein Metricbeat-Dokument einen Mix aus Zeitreihen- und Metadaten enthält und es spezifischer Kenntnisse des Dokuments bedarf, um genau das abrufen zu können, was benötigt wird.

Für das Verarbeiten, Analysieren oder Visualisieren von Zeitreihendaten sollten die Daten dagegen in der Regel in einem tabellenartigen Format wie dem folgenden vorliegen:

<series name> <timestamp> <value> <key-value pairs> 
system.cpu.user.pct 1565610800000 0.843 host.name=”noether” 
system.cpu.user.pct 1565610800000 0.951 host.name=”hilbert” 
system.cpu.user.pct 1565610810000 0.865 host.name=”noether” 
system.cpu.user.pct 1565610810000 0.793 host.name=”hilbert” 
system.cpu.user.pct 1565610820000 0.802 host.name=”noether” 
system.cpu.user.pct 1565610820000 0.679 host.name=”hilbert”

Elasticsearch-Abfragen können Ihnen dabei helfen, Zeitreihendaten automatisiert in einem Format abzurufen, das einer solchen Tabelle sehr nahekommt. Die folgenden Beispiele zeigen, wie dabei vorzugehen ist. Wenn Sie selbst mit den Abfragen experimentieren möchten, benötigen Sie eine Elasticsearch-Instanz und eine laufende Metricbeat-Installation, die Daten für die Metricsets system.cpu und system.network sendet. Eine kurze Einführung in Metricbeat finden Sie unter „Getting started with Metricbeat“.

Sie können alle Abfragen von der „Dev Tools“-Konsole von Kibana aus ausführen. Wenn Sie diese noch nicht kennen, sehen Sie sich die kurze Einführung zur Kibana-„Dev Tool“-Konsole an. Beachten Sie, dass Sie in den Beispielabfragen den Hostnamen ändern müssen.

Wir gehen davon aus, dass Metricbeat gemäß Standardkonfiguration eingerichtet ist. Das heißt, Metricbeat erstellt pro Tag einen Index und für die Benennung dieser Indizes gilt das Format „metricbeat-VERSION-DATUM-ZÄHLER“, also z. B. metricbeat-7.3.0-2019.08.06-000009“. Um alle diese Indizes auf einmal abfragen zu können, verwenden wir einen Platzhalter.

Beispiel-Abfrage:

GET metricbeat-*/_search

Beispiel-Antwort auf die Abfrage:

{ 
  "took" : 2, 
  "timed_out" : false, 
  "_shards" : { 
    "total" : 1, 
    "successful" : 1, 
    "skipped" : 0, 
    "failed" : 0 
  }, 
  "hits" : { 
    "total" : { 
      "value" : 10000, 
      "relation" : "gte" 
    }, 
    "max_score" : 1.0, 
    "hits" : [...] 
  } 
}

Diese Abfrage übersteigt natürlich das Elasticsearch-Limit von Dokumenten, die in einer Abfrage zurückgegeben werden. Die eigentlichen Treffer wurden hier ausgelassen, aber Sie können selbstverständlich Ihre Ergebnisse durchscrollen und sie mit dem obigen mit Anmerkungen versehenen Dokument vergleichen.

Je nach Größe der von Ihnen überwachten Infrastruktur kann die Zahl der Metricbeat-Dokumente sehr groß sein, aber es kommt nur selten vor, dass Sie eine Zeitreihe benötigen, die die gesamte aufgezeichnete Zeit umfasst. Lassen Sie uns also mit einem Datumsbereich beginnen – in diesem Fall mit den letzten fünf Minuten.

Beispiel-Abfrage:

GET metricbeat-*/_search 
{ 
  "query": { 
    "range": { 
      "@timestamp": { 
        "gte": "now-5m" 
      } 
    } 
  } 
}

Beispiel-Antwort auf die Abfrage:

{ 
  "took" : 4, 
  "timed_out" : false, 
  "_shards" : { 
    "total" : 1, 
    "successful" : 1, 
    "skipped" : 0, 
    "failed" : 0 
  }, 
  "hits" : { 
    "total" : { 
      "value" : 30, 
      "relation" : "eq" 
    }, 
    "max_score" : 0.0, 
    "hits" : [...] 
  } 
}

Jetzt haben wir eine viel besser handhabbare Größe. Das System, auf dem diese Abfrage ausgeführt wird, hat jedoch nur einen Host unter sich, weshalb die Zahl der Treffer in einer Produktionsumgebung weiterhin sehr hoch sein wird.

Um nun alle CPU-Daten für einen spezifischen Host abfragen zu können, könnte man versucht sein, Filter für „host.name“ und den Metricset system.cpu hinzuzufügen.

Beispiel-Abfrage:

GET metricbeat-*/_search 
{ 
  "query": { 
    "bool": { 
      "filter": [ 
        { 
          "range": { 
            "@timestamp": { 
              "gte": "now-5m" 
            } 
          } 
        }, 
        { 
          "bool": { 
            "should": [ 
              { 
                "match_phrase": { 
                  "host.name": "noether" 
                } 
              }, 
              { 
                "match_phrase": { 
                  "event.dataset": "system.cpu" 
                } 
              } 
            ] 
          } 
        } 
      ] 
    } 
  } 
}

Beispiel-Antwort auf die Abfrage:

{ 
  "took" : 8, 
  "timed_out" : false, 
  "_shards" : { 
    "total" : 1, 
    "successful" : 1, 
    "skipped" : 0, 
    "failed" : 0 
  }, 
  "hits" : { 
    "total" : { 
      "value" : 30, 
      "relation" : "eq" 
    }, 
    "max_score" : 0.0, 
    "hits" : [...] 
  } 
}

Diese Abfrage gibt aber immer noch eine große Zahl von Dokumenten zurück, die alle die komplette Menge an Daten enthalten, die Metricbeat für den Metricset system.cpu sendet. Das Ergebnis ist nicht sehr hilfreich und das hat mehrere Gründe:

Zum Ersten müssten wir alle Dokumente über den gesamten Zeitraum abrufen. Sobald das konfigurierte Limit erreicht ist, gibt Elasticsearch diese Dokumente nicht einfach in einem Schwung zurück, sondern versucht, sie zu ranken. Das wiederum ergibt bei unserer Abfrage keinen Sinn und führt auch nicht dazu, dass die Dokumente nach Zeitstempel sortiert zurückgegeben werden.

Zum Zweiten interessiert uns bei jedem Dokument nur ein kleiner Teil: der Zeitstempel, ein paar Metrikwerte und vielleicht noch einige zusätzliche Metadatenfelder. Es wäre sehr ineffizient, die gesamten _source-Daten aus Elasticsearch abzurufen und dann aus dem Abfrageergebnis die gewünschten Daten herauszusuchen.

Ein möglicher Lösungsansatz wäre die Verwendung von Elasticsearch-Aggregationen.

Beispiel 1: CPU-Prozentsatz mit Downsampling

Sehen wir uns zunächst einmal Datumshistogramm-Aggregationen an. Eine Datumshistogramm-Aggregation gibt pro Zeitintervall genau einen Wert zurück. Die zurückgegebenen Klassen („Buckets“) sind bereits nach der Zeit geordnet, und das Intervall, also die Klassengröße, kann den Daten entsprechend festgelegt werden. In diesem Beispiel wurde als Intervallgröße 10 Sekunden gewählt, weil Metricbeat standardmäßig alle 10 Sekunden Daten aus dem Modul „system“ sendet. Der allem übergeordnete Parameter size: 0 gibt an, dass uns die eigentlichen Treffer nicht mehr interessieren, sondern nur die Aggregation. Daher werden keine Dokumente zurückgegeben.

Beispiel-Abfrage:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "fixed_interval": "10s" 
      } 
    } 
  } 
}

Beispiel-Antwort auf die Abfrage:

{ 
  ..., 
  "hits" : { 
    "total" : { 
      "value" : 30, 
      "relation" : "eq" 
    }, 
    "max_score" : null, 
    "hits" : [ ] 
  }, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:03:20.000Z", 
          "key" : 1565615000000, 
          "doc_count" : 1 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:03:30.000Z", 
          "key" : 1565615010000, 
          "doc_count" : 1 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:03:40.000Z", 
          "key" : 1565615020000, 
          "doc_count" : 1 
        }, 
        ... 
      ] 
    } 
  } 
}

Für jede Klasse werden die folgenden Angaben zurückgegeben: unter key der Zeitstempel, unter key_as_string das Datum und die Uhrzeit in Klartext und unter doc_count die Zahl der Dokumente, die in der Klasse erfasst wurden.

Der doc_count-Wert ist 1, weil die Klassengröße dem Berichtszeitraum von Metricbeat entspricht. Ansonsten ist die Ausgabe nicht sehr hilfreich; für die eigentlichen Metrikwerte werden wir eine weitere Aggregation hinzufügen. An dieser Stelle gilt es zu entscheiden, um was für eine Aggregation es sich handeln soll – bei numerischen Werten sind avg, min und max gute Kandidaten –, aber solange wir es mit nur einem Dokument pro Klasse zu tun haben, ist es eigentlich egal, was wir auswählen. Dies wird im folgenden Beispiel demonstriert, in dem avg-, min- und max-Aggregationen zu den Werten der Metrik system.cpu.user.pct in Klassen mit einem Intervall von 10 Sekunden zurückgegeben werden.

Beispiel-Abfrage:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "fixed_interval": "10s" 
      }, 
      "aggregations": { 
        "myActualCpuUserMax": { 
          "max": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myActualCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myActualCpuUserMin": { 
          "min": { 
            "field": "system.cpu.user.pct" 
          } 
        } 
      } 
    } 
  } 
}

Beispiel-Antwort auf die Abfrage:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:12:40.000Z", 
          "key" : 1565615560000, 
          "doc_count" : 1, 
          "myActualCpuUserMin" : { 
            "value" : 1.002 
          }, 
          "myActualCpuUserAvg" : { 
            "value" : 1.002 
          }, 
          "myActualCpuUserMax" : { 
            "value" : 1.002 
          } 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:12:50.000Z", 
          "key" : 1565615570000, 
          "doc_count" : 1, 
          "myActualCpuUserMin" : { 
            "value" : 0.866 
          }, 
          "myActualCpuUserAvg" : { 
            "value" : 0.866 
          }, 
          "myActualCpuUserMax" : { 
            "value" : 0.866 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Wie Sie sehen können, sind die Werte für myActualCpuUserMin, myActualCpuUserAvg und myActualCpuUserMax in jeder Klasse gleich. Wenn Sie also die Rohwerte einer Zeitreihe abrufen müssen, für die in regelmäßigen Abständen aktualisierte Werte vorliegen, können Sie dies mit einem Datumshistogramm erreichen.

In den meisten Fällen werden Sie aber nicht an jedem einzelnen Datenpunkt interessiert sein, vor allem nicht dann, wenn alle paar Sekunden eine Messung vorgenommen wird. Häufig ist es besser, Daten mit einer gröberen Granularität zu haben. So gibt es in einer Visualisierung nur eine begrenzte Menge von Pixeln zur Anzeige von Variationen in einer Zeitreihe, sodass Daten mit einer höheren Granularität beim Rendern einfach ausgesondert werden.

Zeitreihendaten werden häufig einem sogenannten „Downsampling“ unterzogen, bei dem mehrere Datenpunkte innerhalb eines bestimmten Zeitraums auf einen einzelnen Datenpunkt reduziert werden. In diesem Fall erfolgt das Downsampling, um die Daten auf eine Granularität zu reduzieren, die für die Anforderungen des jeweils nächsten Verarbeitungsschrittes angemessen ist. In unserem Serverüberwachungsbeispiel werden die Daten alle 10 Sekunden gemessen, aber für die meisten Zwecke würde es ausreichen, den Durchschnitt aller Werte innerhalb einer Minute zu haben. Zufälligerweise ist das Downsampling genau das, was eine Datumshistogramm-Aggregation tut, wenn sie mehr als ein Dokument pro Klasse findet und die richtigen verschachtelten Aggregationen genutzt werden.

Das folgende erste Beispiel fürs Downsampling zeigt das Ergebnis eines Datumshistogramms mit verschachtelten avg-, min- und max-Aggregationen über eine vollständige Klasse von 1 Minute Länge. Die Verwendung von calendar_interval statt von fixed_interval sorgt dafür, dass die Klassengrenzen auf volle Minuten festgelegt werden.

Beispiel-Abfrage:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "calendar_interval": "1m" 
      }, 
      "aggregations": { 
        "myDownsampledCpuUserMax": { 
          "max": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myDownsampledCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myDownsampledCpuUserMin": { 
          "min": { 
            "field": "system.cpu.user.pct" 
          } 
        } 
      } 
    } 
  } 
}

Beispiel-Antwort auf die Abfrage:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:27:00.000Z", 
          "key" : 1565616420000, 
          "doc_count" : 4, 
          "myDownsampledCpuUserMax" : { 
            "value" : 0.927 
          }, 
          "myDownsampledCpuUserMin" : { 
            "value" : 0.6980000000000001 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.8512500000000001 
          } 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:28:00.000Z", 
          "key" : 1565616480000, 
          "doc_count" : 6, 
          "myDownsampledCpuUserMax" : { 
            "value" : 0.838 
          }, 
          "myDownsampledCpuUserMin" : { 
            "value" : 0.5670000000000001 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.7040000000000001 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Wie Sie sehen können, sind die Werte für myActualCpuUserMin, myActualCpuUserAvg und myActualCpuUserMax jetzt unterschiedlich und hängen von der jeweils verwendeten Aggregation ab.

Welche Methode für das Downsampling verwendet wird, richtet sich nach der Metrik. Für den CPU-Prozentsatz eignet sich die avg-Aggregation über eine Minute, während für Metriken wie Warteschlangenlänge oder Systemlast eine max-Aggregation geeigneter erscheint.

An diesem Punkt ist es auch möglich, Elasticsearch ein paar grundlegende Rechenaufgaben übernehmen zu lassen. So kann es eine Zeitreihe berechnen, die in den Originaldaten nicht vorhanden ist. Wenn wir für den CPU-Prozentsatz mit der avg-Aggregation arbeiten, können wir in unserem Beispiel Angaben zur Nutzer-CPU, zur System-CPU und zur Summe aus Nutzer- und System-CPU, geteilt durch die CPU-Kerne gewinnen.

Beispiel-Abfrage:

GET metricbeat-*/_search 
{ 
  "query": {...},   # same as above 
  "size": 0, 
  "aggregations": { 
    "myDateHistogram": { 
      "date_histogram": { 
        "field": "@timestamp", 
        "calendar_interval": "1m" 
      }, 
      "aggregations": { 
        "myDownsampledCpuUserAvg": { 
          "avg": { 
            "field": "system.cpu.user.pct" 
          } 
        }, 
        "myDownsampledCpuSystemAvg": { 
          "avg": { 
            "field": "system.cpu.system.pct" 
          } 
        }, 
        "myCpuCoresMax": { 
          "max": { 
            "field": "system.cpu.cores" 
          } 
        }, 
        "myCalculatedCpu": { 
          "bucket_script": { 
            "buckets_path": { 
              "user": "myDownsampledCpuUserAvg", 
              "system": "myDownsampledCpuSystemAvg", 
              "cores": "myCpuCoresMax" 
            }, 
            "script": { 
              "source": "(params.user + params.system) / params.cores", 
              "lang": "painless" 
            } 
          } 
        } 
      } 
    } 
  } 
}

Beispiel-Antwort auf die Abfrage:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myDateHistogram" : { 
      "buckets" : [ 
        { 
          "key_as_string" : "2019-08-12T13:32:00.000Z", 
          "key" : 1565616720000, 
          "doc_count" : 2, 
          "myDownsampledCpuSystemAvg" : { 
            "value" : 0.344 
          }, 
          "myCpuCoresMax" : { 
            "value" : 8.0 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.8860000000000001 
          }, 
          "myCalculatedCpu" : { 
            "value" : 0.15375 
          } 
        }, 
        { 
          "key_as_string" : "2019-08-12T13:33:00.000Z", 
          "key" : 1565616780000, 
          "doc_count" : 6, 
          "myDownsampledCpuSystemAvg" : { 
            "value" : 0.33416666666666667 
          }, 
          "myCpuCoresMax" : { 
            "value" : 8.0 
          }, 
          "myDownsampledCpuUserAvg" : { 
            "value" : 0.8895 
          }, 
          "myCalculatedCpu" : { 
            "value" : 0.15295833333333334 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Beispiel 2: Netzwerkverkehr – Term-Aggregation und abgeleitete Aggregationen

Ein etwas komplexeres Beispiel dafür, wie nützlich Elasticsearch-Aggregationen für Zeitreihendaten sein können, ist der Metricset system.network. Der relevante Teil eines system.network-Metricset-Dokuments sieht wie folgt aus:

{ 
... 
"system": { 
        "network": { 
            "in": { 
                "bytes": 37904869172, 
                "dropped": 32, 
                "errors": 0, 
                "packets": 32143403 
            }, 
            "name": "wlp4s0", 
            "out": { 
                "bytes": 6299331926, 
                "dropped": 0, 
                "errors": 0, 
                "packets": 13362703 
            } 
        } 
    } 
... 
}

Metricbeat sendet für jede Netzwerkschnittstelle im System ein Dokument. Diese Dokumente haben alle denselben Zeitstempel, aber unterschiedliche Werte für das Feld system.network.name, einen pro Netzwerkschnittstelle.

Jede weitere Aggregation muss über die Schnittstelle ausgeführt werden. Daher ändern wir die übergeordnete Datenhistogramm-Aggregation aus den vorherigen Beispielen in eine Terms-Aggregation über das Feld system.network.name.

Damit dies funktioniert, muss das Feld, über das aggregiert wird, als „keyword“-Feld gemappt worden sein. Wenn Sie mit der von Metricbeat bereitgestellten Standardindexvorlage arbeiten, ist dieses Mapping bereits fertig eingerichtet. Ist dies nicht der Fall, können Sie auf der Dokumentationsseite zu Metricbeat-Vorlagen nachlesen, was Sie für die Einrichtung tun müssen.

Beispiel-Abfrage:

GET metricbeat-*/_search 
{ 
  "query": {...}, # same as above 
  "size": 0, 
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name", 
        "size": 50 
      }, 
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp", 
            "calendar_interval": "1m" 
          } 
        } 
      } 
    } 
  } 
}

Beispiel-Antwort auf die Abfrage:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" : 0, 
      "sum_other_doc_count" : 0, 
      "buckets" : [ 
        { 
          "key" : "docker0", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        }, 
        { 
          "key" : "enp0s31f6", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        }, 
        { 
          "key" : "lo", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [...] 
          } 
        }, 
        { 
          "key" : "wlp61s0", 
          "doc_count" : 29, 
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" : "2019-08-12T13:39:00.000Z", 
                "key" : 1565617140000, 
                "doc_count" : 1 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:40:00.000Z", 
                "key" : 1565617200000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:41:00.000Z", 
                "key" : 1565617260000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:42:00.000Z", 
                "key" : 1565617320000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:43:00.000Z", 
                "key" : 1565617380000, 
                "doc_count" : 6 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:44:00.000Z", 
                "key" : 1565617440000, 
                "doc_count" : 4 
              } 
            ] 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Wie schon beim CPU-Beispiel gibt die Datumshistogramm-Aggregation ohne verschachtelte Aggregation nur die Zahl der Dokumente (doc_count) zurück, was wenig hilfreich ist.

Die Felder für Bytes enthalten monoton steigende Werte. Der Wert dieser Felder enthält die Zahl der seit dem letzten Start der Maschine gesendeten oder empfangenen Bytes und steigt daher mit jeder Messung. In diesem Fall ist die korrekte verschachtelte Aggregation max, das heißt, der durch Downsampling reduzierte Wert enthält den höchsten und damit den zuletzt gemessenen Wert während des Klassenintervalls.

Beispiel-Abfrage:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name", 
        "size": 50 
      }, 
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp", 
            "calendar_interval": "1m" 
          }, 
          "aggregations": { 
            "myNetworkInBytesMax": { 
              "max": { 
                "field": "system.network.in.bytes" 
              } 
            }, 
            "myNetworkOutBytesMax": { 
              "max": { 
                "field": "system.network.out.bytes" 
              } 
            } 
          } 
        } 
      } 
    } 
  } 
}

Beispiel-Antwort auf die Abfrage:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" : 0, 
      "sum_other_doc_count" : 0, 
      "buckets" : [ 
        { 
          "key" : "docker0", 
          ... 
        }, 
        { 
          "key" : "enp0s31f6", 
          ... 
        }, 
        { 
          "key" : "lo", 
          ... 
        }, 
        { 
          "key" : "wlp61s0", 
          "doc_count" : 30, 
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" : "2019-08-12T13:50:00.000Z", 
                "key" : 1565617800000, 
                "doc_count" : 2, 
                "myNetworkInBytesMax" : { 
                  "value" : 2.991659837E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.46578365E8 
                } 
              }, 
              { 
                "key_as_string" : "2019-08-12T13:51:00.000Z", 
                "key" : 1565617860000, 
                "doc_count" : 6, 
                "myNetworkInBytesMax" : { 
                  "value" : 2.992027006E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.46791988E8 
                }, 
                "myNetworkInBytesPerSecond" : { 
                  "value" : 367169.0, 
                  "normalized_value" : 6119.483333333334 
                }, 
                "myNetworkoutBytesPerSecond" : { 
                  "value" : 213623.0, 
                  "normalized_value" : 3560.383333333333 
                } 
              }, 
              ... 
            ] 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Um anhand des monoton steigenden Zählerwerts die Geschwindigkeit der Datenübertragung in Byte pro Sekunde zu erhalten, kann eine abgeleitete Aggregation genutzt werden. Wenn dieser Aggregation der optionale Parameter unit übergeben wird, gibt sie im Feld normalized_value den gewünschten Wert pro Einheit zurück.

Beispiel-Abfrage:

GET metricbeat-*/_search 
{ 
  "query": {...},  # same as above 
  "size": 0, 
  "aggregations": { 
    "myNetworkInterfaces": { 
      "terms": { 
        "field": "system.network.name", 
        "size": 50 
      }, 
      "aggs": { 
        "myDateHistogram": { 
          "date_histogram": { 
            "field": "@timestamp", 
            "calendar_interval": "1m" 
          }, 
          "aggregations": { 
            "myNetworkInBytesMax": { 
              "max": { 
                "field": "system.network.in.bytes" 
              } 
            }, 
            "myNetworkInBytesPerSecond": { 
              "derivative": { 
                "buckets_path": "myNetworkInBytesMax", 
                "unit": "1s" 
              } 
            }, 
            "myNetworkOutBytesMax": { 
              "max": { 
                "field": "system.network.out.bytes" 
              } 
            }, 
            "myNetworkoutBytesPerSecond": { 
              "derivative": { 
                "buckets_path": "myNetworkOutBytesMax", 
                "unit": "1s" 
              } 
            } 
          } 
        } 
      } 
    } 
  } 
}

Beispiel-Antwort auf die Abfrage:

{ 
  ..., 
  "hits" : {...}, 
  "aggregations" : { 
    "myNetworkInterfaces" : { 
      "doc_count_error_upper_bound" : 0, 
      "sum_other_doc_count" : 0, 
      "buckets" : [ 
        { 
          "key" : "docker0", 
          ... 
        }, 
        { 
          "key" : "enp0s31f6", 
          ... 
        }, 
        { 
          "key" : "lo", 
          ... 
        }, 
        { 
          "key" : "wlp61s0", 
          "doc_count" : 30, 
          "myDateHistogram" : { 
            "buckets" : [ 
              { 
                "key_as_string" : "2019-08-12T14:07:00.000Z", 
                "key" : 1565618820000, 
                "doc_count" : 4, 
                "myNetworkInBytesMax" : { 
                  "value" : 3.030494669E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.56084749E8 
                } 
              }, 
              { 
                "key_as_string" : "2019-08-12T14:08:00.000Z", 
                "key" : 1565618880000, 
                "doc_count" : 6, 
                "myNetworkInBytesMax" : { 
                  "value" : 3.033793744E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.56323416E8 
                }, 
                "myNetworkInBytesPerSecond" : { 
                  "value" : 3299075.0, 
                  "normalized_value" : 54984.583333333336 
                }, 
                "myNetworkoutBytesPerSecond" : { 
                  "value" : 238667.0, 
                  "normalized_value" : 3977.7833333333333 
                } 
              }, 
              { 
                "key_as_string" : "2019-08-12T14:09:00.000Z", 
                "key" : 1565618940000, 
                "doc_count" : 6, 
                "myNetworkInBytesMax" : { 
                  "value" : 3.037045046E9 
                }, 
                "myNetworkOutBytesMax" : { 
                  "value" : 5.56566282E8 
                }, 
                "myNetworkInBytesPerSecond" : { 
                  "value" : 3251302.0, 
                  "normalized_value" : 54188.36666666667 
                }, 
                "myNetworkoutBytesPerSecond" : { 
                  "value" : 242866.0, 
                  "normalized_value" : 4047.766666666667 
                } 
              }, 
              ...   
            ] 
          } 
        }, 
        ... 
      ] 
    } 
  } 
}

Wenn Sie möchten, können Sie all dies mit Ihrem eigenen Cluster nachvollziehen. Sollten Sie noch kein Cluster haben, können Sie den Elasticsearch Service auf Elastic Cloud kostenlos ausprobieren oder die Standarddistribution des Elastic Stack herunterladen. Mit Metricbeat ist es ganz einfach, Daten aus Ihren Systemen zu senden, damit Sie sie schnell und gewinnbringend abfragen können!