Wie man Daten über Apache Airflow in Elasticsearch ingestiert

Erfahren Sie, wie Sie Daten mithilfe von Apache Airflow in Elasticsearch einlesen.

Mit Elasticsearch können Sie Daten schnell und flexibel indexieren. Testen Sie es kostenlos in der Cloud oder führen Sie es lokal aus, um zu erfahren, wie einfach die Indizierung sein kann.

Was ist Apache Airflow?

Apache Airflow ist eine Plattform, die zum Erstellen, Planen und Überwachen von Arbeitsabläufen entwickelt wurde. Es dient zur Orchestrierung von ETL-Prozessen, Datenpipelines und anderen komplexen Arbeitsabläufen und bietet Flexibilität und Skalierbarkeit. Die visuelle Benutzeroberfläche und die Echtzeit-Überwachungsfunktionen machen das Pipeline-Management zugänglicher und effizienter und ermöglichen es Ihnen, den Fortschritt und die Ergebnisse Ihrer Ausführungen zu verfolgen. Nachfolgend sind die vier Hauptpfeiler aufgeführt:

  • Dynamisch: Pipelines werden in Python definiert, was eine dynamische und flexible Workflow-Generierung ermöglicht.
  • Erweiterbar: Airflow lässt sich in eine Vielzahl von Umgebungen integrieren, benutzerdefinierte Operatoren können erstellt und spezifischer Code nach Bedarf ausgeführt werden.
  • Elegant: Pipelines werden sauber und explizit geschrieben.
  • Skalierbar: Dank seiner modularen Architektur nutzt es eine Message Queue, um eine beliebige Anzahl von Workern zu orchestrieren.

In der Praxis kann Airflow in Szenarien wie den folgenden eingesetzt werden:

  • Datenimport: Orchestrieren Sie die tägliche Datenaufnahme in eine Datenbank wie Elasticsearch.
  • Protokollüberwachung: Verwaltung der Erfassung und Verarbeitung von Protokolldateien, die anschließend in Elasticsearch analysiert werden, um Fehler oder Anomalien zu identifizieren.
  • Integration mehrerer Datenquellen: Informationen aus verschiedenen Systemen (APIs, Datenbanken, Dateien) werden in einer einzigen Ebene in Elasticsearch kombiniert, was die Suche und Berichterstellung vereinfacht.

Verständnis von gerichteten azyklischen Graphen (DAGs) in der Luftströmung

In Airflow werden Workflows durch DAGs (gerichtete azyklische Graphen) dargestellt. Ein DAG ist eine Struktur, die die Reihenfolge definiert, in der Aufgaben ausgeführt werden. Die Hauptmerkmale von DAGs sind:

  • Zusammensetzung aus unabhängigen Aufgaben: Jede Aufgabe stellt eine Arbeitseinheit dar und ist so konzipiert, dass sie unabhängig ausgeführt werden kann.
  • Sequenzierung: Die Reihenfolge, in der die Aufgaben ausgeführt werden, ist im DAG explizit definiert.
  • Wiederverwendbarkeit: DAGs sind so konzipiert, dass sie wiederholt ausgeführt werden können, was die Prozessautomatisierung erleichtert.

Luftstromkomponenten

Das Airflow-Ökosystem besteht aus mehreren Komponenten, die zusammenarbeiten, um Aufgaben zu orchestrieren:

  • Scheduler: Verantwortlich für die Planung von DAGs und die Zuweisung von Aufgaben zur Ausführung an die Worker.
  • Ausführender: Steuert die Ausführung von Aufgaben und delegiert diese an Mitarbeiter.
  • Webserver: Bietet eine grafische Benutzeroberfläche zur Interaktion mit DAGs und Tasks.
  • Dags-Ordner: Ordner, in dem wir in Python geschriebene DAGs speichern.
  • Metadaten: Datenbank, die als Repository für das Tool dient und vom Scheduler und Executor zur Speicherung des Ausführungsstatus verwendet wird.

Apache Airflow und Elasticsearch

Wir werden die Verwendung von Apache Airflow und Elasticsearch zur Orchestrierung von Aufgaben und zur Indizierung von Ergebnissen in Elasticsearch demonstrieren. Ziel dieser Demonstration ist es, eine Aufgabenpipeline zu erstellen, um Datensätze in einem Elasticsearch-Index zu aktualisieren. Dieser Index enthält eine Datenbank mit Filmen, in der Benutzer Bewertungen abgeben und andere Bewertungen vergeben können. Stellt man sich ein Szenario mit Hunderten von täglichen Bewertungen vor, ist es notwendig, die Bewertungsstatistik stets aktuell zu halten. Hierfür wird ein DAG entwickelt, der täglich ausgeführt wird und für das Abrufen der neuen konsolidierten Ratings sowie die Aktualisierung der Datensätze im Index zuständig ist.

Im DAG-Ablauf gibt es eine Aufgabe zum Abrufen der Bewertungen, gefolgt von einer Aufgabe zur Validierung der Ergebnisse. Falls die Daten nicht vorhanden sind, wird der DAG zu einer Fehleraufgabe weitergeleitet. Andernfalls werden die Daten in Elasticsearch indexiert. Ziel ist es, das Bewertungsfeld von Filmen in einem Index zu aktualisieren, indem die Bewertungen mithilfe einer Methode abgerufen werden, die den Mechanismus zur Berechnung der Punktzahlen beinhaltet.

Verwendung von Apache Airflow und Elasticsearch mit Docker

Um eine containerisierte Umgebung zu erstellen, verwenden wir Apache Airflow mit Docker. Folgen Sie den Anweisungen im Leitfaden „Airflow in Docker ausführen“, um Airflow praktisch einzurichten.

Was Elasticsearch betrifft, werde ich einen Cluster auf Elastic Cloud verwenden, aber wenn Sie es vorziehen, können Sie Elasticsearch auch mit Docker konfigurieren. Es wurde bereits ein Index erstellt, der einen Filmkatalog mit den indexierten Filmdaten enthält. Das Feld „Bewertung“ dieser Filme wird aktualisiert.

Erstellung des DAG

Nach der Installation via Docker wird eine Ordnerstruktur erstellt, die unter anderem den Ordner „dags“ enthält. In diesem Ordner müssen wir unsere DAG-Dateien ablegen, damit Airflow sie erkennt.

Zuvor müssen wir sicherstellen, dass die notwendigen Abhängigkeiten installiert sind. Hier sind die Abhängigkeiten für dieses Projekt:

Wir werden die Datei update_ratings_movies.py erstellen und mit der Codierung der Aufgaben beginnen.

Nun importieren wir die benötigten Bibliotheken:

Wir werden den ElasticsearchPythonHook verwenden, eine Komponente, die die Integration zwischen Airflow und einem Elasticsearch-Cluster vereinfacht, indem sie die Verbindung und die Verwendung externer APIs abstrahiert.

Als Nächstes definieren wir den DAG und geben seine Hauptargumente an:

  • dag_id: der Name des DAG.
  • start_date: wann der DAG startet.
  • schedule: definiert die Periodizität (in unserem Fall täglich).
  • doc_md: Dokumentation, die importiert und in der Airflow-Oberfläche angezeigt wird.

Definition der Aufgaben

Nun definieren wir die Aufgaben des DAG. Die erste Aufgabe besteht darin, die Daten zur Filmbewertung abzurufen. Wir werden den PythonOperator verwenden, wobei task_id auf 'get_movie_ratings' gesetzt ist. Der Parameter python_callable ruft die Funktion auf, die für das Abrufen der Bewertungen zuständig ist.

Als nächstes müssen wir überprüfen, ob die Ergebnisse gültig sind. Hierfür verwenden wir eine Bedingung mit einem BranchPythonOperator. Die task_id wird zu 'validate_result', und die python_callable ruft die Validierungsfunktion auf. Der Parameter op_args wird verwendet, um das Ergebnis der vorherigen Aufgabe, 'get_movie_ratings', an die Validierungsfunktion zu übergeben.

Wenn die Validierung erfolgreich ist, werden wir die Daten aus der 'get_movie_ratings' -Aufgabe nehmen und in Elasticsearch indizieren. Um dies zu erreichen, erstellen wir eine neue Aufgabe, 'index_movie_ratings', die den PythonOperator verwendet. Der Parameter op_args übergibt die Ergebnisse der Aufgabe 'get_movie_ratings' an die Indexierungsfunktion.

Wenn die Validierung einen Fehler anzeigt, fährt der DAG mit einer Fehlerbenachrichtigungsaufgabe fort. In diesem Beispiel geben wir einfach eine Meldung aus, aber in einem realen Szenario könnten wir Warnmeldungen konfigurieren, um über die Fehler zu informieren.

Abschließend definieren wir die Aufgabenabhängigkeiten und stellen sicher, dass sie in der richtigen Reihenfolge ausgeführt werden:

Hier folgt nun der vollständige Code unseres DAG:

Visualisierung der DAG-Ausführung

In der Apache Airflow-Oberfläche können wir die Ausführung der DAGs visualisieren. Gehen Sie einfach auf die Registerkarte „DAGs“ und suchen Sie den von Ihnen erstellten DAG.

Im Folgenden können wir die Ausführung der Aufgaben und ihre jeweiligen Status visualisieren. Durch die Auswahl einer Ausführung für ein bestimmtes Datum können wir auf die Protokolle der einzelnen Aufgaben zugreifen. Beachten Sie, dass wir in der index_movie_ratings -Aufgabe die Indexierungsergebnisse im Index sehen können und dass sie erfolgreich abgeschlossen wurde.

In den anderen Registerkarten können Sie auf zusätzliche Informationen zu den Aufgaben und dem DAG zugreifen, die Ihnen bei der Analyse und Lösung potenzieller Probleme helfen.

Fazit

In diesem Artikel haben wir gezeigt, wie man Apache Airflow mit Elasticsearch integriert, um eine Datenerfassungslösung zu erstellen. Wir haben gezeigt, wie man den DAG konfiguriert, die Aufgaben definiert, die für das Abrufen, Validieren und Indizieren von Filmdaten zuständig sind, und wie man die Ausführung dieser Aufgaben in der Airflow-Oberfläche überwacht und visualisiert.

Dieser Ansatz lässt sich leicht an verschiedene Datentypen und Arbeitsabläufe anpassen, wodurch Airflow zu einem nützlichen Werkzeug für die Orchestrierung von Datenpipelines in unterschiedlichen Szenarien wird.

Referenzen

Apache AirFlow

https://airflow.apache.org/

Apache Airflow mit Docker installieren

https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

Elasticsearch Python Hook

https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html

Python-Operator

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

Häufige Fragen

Was ist Apache Airflow?

Apache Airflow ist eine Plattform, die zum Erstellen, Planen und Überwachen von Arbeitsabläufen entwickelt wurde.

Wozu dient Apache Airflow?

Apache Airflow dient zur Orchestrierung von ETL-Prozessen, Datenpipelines und anderen komplexen Arbeitsabläufen und bietet Flexibilität und Skalierbarkeit.

Was sind die Hauptkomponenten von Apache Airflow?

Die Hauptkomponenten von Airflow sind: Scheduler, Executor, Web Server, Dags Folder und Metadata.

Lässt sich Apache Airflow mit Elasticsearch integrieren?

Ja, Apache Airflow kann mit Elasticsearch integriert werden. Beispielsweise können Sie Apache Airflow verwenden, um Aufgaben zu orchestrieren und Ergebnisse in Elasticsearch zu indizieren.

Was ist ein DAG in Apache Airflow?

In Airflow werden Workflows durch DAGs (gerichtete azyklische Graphen) dargestellt. Ein DAG ist eine Struktur, die die Reihenfolge definiert, in der Aufgaben ausgeführt werden.

Zugehörige Inhalte

Sind Sie bereit, hochmoderne Sucherlebnisse zu schaffen?

Eine ausreichend fortgeschrittene Suche kann nicht durch die Bemühungen einer einzelnen Person erreicht werden. Elasticsearch wird von Datenwissenschaftlern, ML-Ops-Experten, Ingenieuren und vielen anderen unterstützt, die genauso leidenschaftlich an der Suche interessiert sind wie Sie. Lasst uns in Kontakt treten und zusammenarbeiten, um das magische Sucherlebnis zu schaffen, das Ihnen die gewünschten Ergebnisse liefert.

Probieren Sie es selbst aus