22 Juni 2016 Engineering

Kafka für den Elastic Stack, Teil 2

Von Suyog Rao

Willkommen zu Teil 2 unseres mehrteiligen Beitrags über Apache Kafka und den Elastic Stack. In unserem letzten Beitrag haben wir Anwendungsfälle von Kafka für den Elastic Stack vorgestellt und mit euch unser Wissen zur Planung eurer Systeme auf zeit- und nutzerbasierte Datenströme geteilt. In diesem Beitrag werden wir uns auf die operativen Aspekte konzentrieren: Ihr bekommt Tipps zum Betrieb von Kafka und Logstash in Produktionsumgebungen für die Verarbeitung riesiger Datenmengen.

Kapazitätsplanung

Bevor wir gleich in die Materie einsteigen, hier noch einmal der Hinweis zu den Versionen, über die wir reden werden: Kafka 0.8 und Logstash 2.x, die aktuelle stabile Version. Es gibt neuere Versionen von Kafka – 0.9 und seit kurzem 0.10 – aber die hier besprochenen Grundprinzipien können auf alle Kafka-Versionen angewendet werden. Dann legen wir auch schon los und erklären die verschiedenen Systeme, die hier wirken:

Apache ZooKeeper: Kafka ist abhängig von ZooKeeper (ZK) ; Broker benötigen ZooKeeper zur Bildung von Clustern, die Topic-Konfiguration wird in ZK-Nodes gespeichert etc. Außerdem sind in Logstash ab Version 2.x die Input-Offsets nach der Bestätigung in ZK gespeichert. Neuere Versionen von Kafka haben die Clients – Verbraucher und Produzenten – von der Kommunikation mit ZooKeeper entkoppelt. In Kafka 0.9 und 0.10 werden die Offsets standardmäßig in Topics und nicht mehr in ZK gespeichert. In jedem Fall brauchen Sie ZooKeeper immer noch, um Kafka-Broker auszuführen. Allgemein empfehlen wir, 3 ZK-Instanzen auf jeweils separater Hardware laufen zu lassen, um eine Quorumskonfiguration einzurichten. Weitere Informationen zum Betrieb von ZK findet ihr in diesem ausgezeichneten Abschnitt in der Kafka-Dokumentation. Unserer Erfahrung nach braucht ZK keinen Babysitter, wenn die Instanzen erst einmal eingerichtet wurden. Achtet einfach darauf, dass die Instanzen laufen und überwacht werden.

Kafka Brokers: Die Anzahl der benötigten Kafka-Broker hängt üblicherweise von der Speicher- und Replikationsstrategie für die Daten ab. Je mehr Broker ihr hinzufügt, desto mehr Daten könnt ihr in Kafka speichern. Was die Ressourcen angeht liegt bei Kafka der Flaschenhals in der IO. Die Leistung wird durch die Plattengeschwindigkeit und den Dateisystem-Cache begrenzt – gute SSD-Laufwerke und Dateisystem-Caches unterstützen Leseraten von Millionen Nachrichten pro Sekunde mit links. Zur Überwachung dieser Informationen könnt ihr topbeat verwenden.

Logstash: Wie viele Logstash-Instanzen braucht man, um die Daten in Kafka zu verarbeiten? Dafür gibt es wirklich keine magische Zahl, denn das hängt von viel zu vielen Variablen ab. Erst müsst ihr Fragen beantworten wie: Wie viele Filter habe ich? Wie teuer sind meine Filter? (Schnell hat man ein komplexes Grok-Muster mit mehreren Bedingungen zur Datenverarbeitung an der Backe!) Welches Datenvolumen erwarte ich? Welche Outputs liefere ich? Wie ihr seht gibt es viele Informationen, die wir brauchen, bevor wir eine Zahl angeben können. Oftmals sind es die Outputs (externe Systeme), auf die ihr euch bei der Kapazitätsplanung konzentrieren müsst, nicht Logstash! Dabei könnt ihr Logstash und Elasticsearch leicht horizontal skalieren. Deshalb würden wir empfehlen, klein anzufangen und bei wachsenden Datenbedürfnissen weitere Nodes oder neue LS-Instanzen hinzuzufügen.

Insbesondere könnt ihr für Daten in Kafka, die Logstash konsumiert, mehrere Instanzen zu Verbrauchergruppen zusammenfassen. Die Gruppen teilen sich die Last und die Instanzen verarbeiten die Daten exklusiv, d. h. Nachrichten werden nur einmal von einem Client in der Gruppe konsumiert. Dieses Design fügt sich gut in unseren ursprünglichen Vorschlag ein – klein anfangen und iterativ skalieren. Mit Topics könnt ihr euren Workflow so auslegen, dass Daten, die komplexere Transformationen erfordern oder die in einem langsameren Output gespeichert werden müssen, von anderen, schnelllebigen Daten isoliert werden. In Logstash kann ein einziger langsamer Output alle anderen Outputs blockieren, die für die nachfolgende Ausführung konfiguriert sind.

Elasticsearch: Wie bereits erwähnt ist Elasticsearch wirklich elastisch und lässt sich leicht hochskalieren. Die Kapazitätsplanung für Elasticsearch würde einen eigenen Blogeintrag füllen und den Rahmen dieses Artikels sprengen. Wir empfehlen euch, folgende Beiträge zu lesen, in denen die Prinzipien der Skalierung und Größenbestimmung von Elasticsearch erklärt werden Größenbestimmung für Elasticsearch, Performance-Aspekte für die Indexierung mit Elasticsearch und andere.    

Datenspeicherung

Wenn eurer Kafka-Instanz der Speicherplatz ausgeht, ist möglicherweise die Aufbewahrungszeit für die Kafka-Logs zu lang. In Kafka könnt ihr die Datenspeicherung anhand von zwei Kriterien konfigurieren: Alter und Größe, mit den Broker-Einstellungen log.retention.bytes und log.retention.hours. Wenn eines dieser Kriterien erfüllt wird, beginnt der Kafka-Broker mit dem Löschen von Nachrichten. Dabei fängt er mit der ältesten an, egal ob Logstash diese schon konsumiert hat oder nicht.

Es ist natürlich verlockend, die Datenwiederherstellung und -speicherung für Elasticsearch mit den Aufbewahrungs-Tools von Kafka zu designen. Unserer Erfahrung nach machen sich aber Tools wie Curator am besten zur Verwaltung der zeitbasierten Indizes von Elasticsearch. Außerdem sollte natürlich eine Snapshot-Strategie für die Wiederherstellung von Indizes nach katastrophalen Ausfällen konfiguriert werden. Am häufigsten liegen die Daten in Kafka als rohe, ungefilterte Inhalte mit mehreren Zielen vor; deshalb sollten sie nicht eng an eine nachgelagerte Komponente gebunden werden.

Offset-Verwaltung und Nachrichtenzustellungsgarantien

Aus der Kafka-Dokumentation:

Die Nachrichten in den Partitionen erhalten jeweils eine sequenzielle ID-Nummer namens Offset zugewiesen, die jede Nachricht in der Partition eindeutig kennzeichnet. Der Offset wird vom Verbraucher gesteuert: Normalerweise steigert ein Verbraucher seinen Offset linear, während er Nachricht für Nachricht liest.

Der Kafka-Input behält die Offset-Informationen mit ZooKeeper im Auge. Wenn Logstash die Nachrichten aus dem Topic abruft und sie verarbeitet, meldet das Programm das regelmäßig an ZK. Dieser Prozess wird „Check-Pointing“ oder „Committing“ (Melden) genannt. Standardmäßig führt Logstash den Check-Point-Prozess an ZK einmal pro Minute durch. Ihr könnt die Häufigkeit mit der Einstellung auto_commit_interval_ms kontrollieren. Beachtet, dass längere Zeiten bei dieser Einstellung zu Datenverlusten führen können, wenn Logstash gewaltsam gestoppt wird oder der Prozess abstürzt. Andererseits erhöht natürlich ein niedrigerer Zeitwert die Schreibvorgänge pro Client, womit dann der ZK-Cluster zu kämpfen hat.

Wenn ihr Logstash neu startet, liest das Programm zunächst die in ZK gespeicherten Offset-Informationen und beginnt, Nachrichten vom letzten Commit-Point abzurufen. Kafka ist auf die „Mindestens einmal“ -Semantik ausgelegt – Nachrichten gehen garantiert nicht verloren, können aber erneut zugestellt werden. Das heißt, es kann Szenarien geben, in denen Logstash abstürzt, während sich der Offset noch im Speicher befindet und nicht gemeldet wurde. Das kann dazu führen, dass Nachrichten erneut zugestellt, d. h. dupliziert werden. Wenn das für euren Anwendungsfall von Belang ist, könnt ihr die potenzielle Duplizierung umgehen, indem ihr eindeutige IDs in einem Feld für eure Nachrichten erzeugt/verwendet. Egal ob ihr euren eigenen Code zur Erzeugung dieser IDs oder den uuid -Filter in Logstash verwendet: Ihr müsst das erledigen, bevor die Nachrichten in Kafka ankommen. Auf der „Versandseite“ von Logstash könnt ihr diese Event-ID der Option document_id im Elasticsearch-Output-Plugin zuordnen. Das bedeutet, dass Elasticsearch das indexierte Dokument mit derselben ID überschreibt, was generell der Erzeugung mehrerer Dokumente mit demselben Inhalt vorgezogen wird!

Das ist auch nützlich, wenn ihr Inhalte erneut wiedergeben müsst, weil nachgelagert Daten verloren gegangen sind. Ihr könntet eine andere Verbrauchergruppe benutzen, um Daten in ihrem eigenen Tempo erneut wiederzugeben.

input {
  kafka {
    zk_connect => "kafka:2181"
    group_id => "logstash"
    topic_id => "apache_logs"
    consumer_threads => 16
  }
}
....
output {
  elasticsearch {
    document_id => "%{my_uuid}"
  }
}

Dabei ist my_uuid ein vorhandenes Feld im Event.

Überwachung: Wie weiß ich, ob eine Verzögerung vorliegt?

Eines der wichtigsten Dinge, die ihr bei der Nutzung von Kafka immer im Auge behalten solltet, ist die Anzahl der Nachrichten in der Warteschlange, die noch von Logstash konsumiert werden müssen. Es gibt zahlreiche Tools zur Überwachung dieser Informationen, im Folgenden führen wir einige Optionen an:

CLI-Tool im Verbund mit Kafka

Einfaches Befehlszeilen-Tool zur Offset-Prüfung. Ihr könnt es regelmäßig mit einem Cronjob ausführen und euch mit eurem Lieblings-Benachrichtigungstool alarmieren lassen.

/usr/bin/kafka-consumer-offset-checker --group logstash --topic apache_logs --zookeeper localhost:2181

Musterantwort:

Group    Topic       Pid Offset  logSize Lag     Owner
logstash apache_logs 0   145833  300000  154167  none
logstash apache_logs 1   145720  300000  154280  none
logstash apache_logs 2   145799  300000  154201  none
logstash apache_logs 3   146267  300000  153733  none

Die Spalte Lag (Verzögerung) sagt euch, um wie viele Nachrichten ihr hinterherhinkt.

JMX

Kafka lässt sich einfach per JMX mit JConsole überwachen. Zur Einbindung von JMX für die Logstash-Überwachung könnt ihr diese zusätzlichen Java-Optionen einstellen, bevor ihr Logstash startet:

export LS_JAVA_OPTS="
 -Dcom.sun.management.jmxremote.authenticate=false
 -Dcom.sun.management.jmxremote.port=3000
 -Dcom.sun.management.jmxremote.ssl=false
 -Dcom.sun.management.jmxremote.authenticate=false"

Wenn euer System auf AWS läuft, müsst ihr den externen Hostnamen oder die IP des Servers benutzen.

-Djava.rmi.server.hostname=ec2-107-X-X-X.compute-1.amazonaws.com

Elastic Stack

Natürlich könnt ihr Kafka auch mit dem Elastic Stack selbst überwachen. Das ist selbstverständlich die von uns favorisierte Option! Für diesen Spezialfall nutzen wir den eigens entwickelten Beat namens Kafkabeat, der von unserem geschätzten Dale McDiarmid programmiert wurde. Dieser Beat sammelt den Offset und andere Topic-Informationen und speichert sie in Elasticsearch. Ihr könnt den Verbraucher-Lag dann mit Kibana analysieren. Zusammen mit topbeat , das Statistiken auf Systemebene wie Schreib- und Lesedurchsatz, CPU und Speicher erfasst, haben wir eine leistungsstarke Überwachungslösung für Kafka. Jetzt habt ihr also alle Daten an einem Ort und könnt euren Chef davon überzeugen, diese alten Festplatten durch brandneue SSDs zu ersetzen! Und mit 5.0.0 wird es sogar noch besser. All diese kritischen Informationen – Überwachungdaten auf Anwendungs- und Systemebene – werden in einem Beat namens Metricbeat zusammengefasst. Klar soweit?

Okay, nun aber zurück zu Kafkabeat. So legt ihr damit los:

  1. Klont https://github.com/gingerwizard/kafkabeat
  2. Führt make im Kafkabeat-Verzeichnis aus.
  3. Implementiert Kafkabeat in euren Kafka-Brokern und führt es aus mit ./kafkabeat -c kafkabeat.yml
Damit werden die Offset-Informationen aus allen Kafka-Topics für diesen Broker gesammelt und mit folgender Dokumentenstruktur in Elasticsearch indexiert.
"@timestamp": "2016-06-22T01:00:43.033Z",
  "beat": {
    "hostname": "Suyogs-MBP-2",
    "name": "Suyogs-MBP-2"
  },
  "type": "consumer",
  "partition": 0,
  "topic": "apache_logs_test",
  "group": "logstash",
  "offset": 3245
  "lag": 60235
}

Sobald die Daten in Elasticsearch sind, gestaltet sich die Visualisierung mit Kibana ganz einfach. Ich habe den taufrischen Release Kibana 5.0.0-alpha3 genutzt, um ein Dashboard zu erstellen, mit dem ich das Feld Verbraucher-Lag im Verlauf des Zeitstempels als Diagramm darstellen kann.

Kibana_Kafka2.png

Kafka Manager

Das ist ein Open-Source-UI-Tool zur Verwaltung von Kafka in seiner Gesamtheit. Ihr könnt Topics erstellen, Kennzahlen verfolgen, Offsets verwalten und mehr. Beachtet, dass dieses Tool eine Weile zum Kompilieren und Bauen braucht. Wenn ihr aber eine End-to-End-Verwaltungslösung für Kafka braucht, solltet ihr dieses Tool ausprobieren. Nach dem Start des Kafka-Managers befolgt ihr einfach die Anweisungen zur Erstellung eines neuen Clusters für die Überwachung, indem ihr diesen an eure ZK-Instanz verweist.

Verbraucheransicht

kafka UI.png

Topic-Ansicht

Kafka UI 2.png

Fazit

In diesem Beitrag haben wir Tipps zum Betrieb von Kafka und Logstash geliefert, damit ihr Daten aus mehreren Quellen in Elasticsearch aufnehmen könnt. Bald gibt es noch mehr!

Im vergangenen Jahr veröffentlichte Kafka die Version 0.9.0 und kürzlich 0.10.0, die nur so strotzt vor neuen Funktionen wie integrierten Sicherheits-Features, neuer Verbraucherimplementierung, Datenquoten und mehr. Wir haben den Input und Output von Logstash angepasst, damit ihr diese Funktionen mit Logstash nutzen könnt! Im nächsten Beitrag besprechen wir die neuen Features in Kafka, insbesondere die End-to-End-Sicherheit mit Kafka und dem Elastic Stack.

Bis zum nächsten Mal!