如何通过 Apache Airflow 将数据采集到 Elasticsearch

了解如何通过 Apache Airflow 将数据摄取到 Elasticsearch。

Elasticsearch 允许您快速且灵活地对数据构建索引。在云端免费试用或在本地运行,看看构建索引有多简单。

什么是阿帕奇气流?

Apache Airflow 是一个用于创建、调度和监控工作流的平台。它用于协调 ETL 流程、数据管道和其他复杂的工作流程,具有灵活性和可扩展性。它的可视化界面和实时监控功能使管道管理更方便、更高效,让您可以跟踪执行的进度和结果。以下是其四大支柱:

  • 动态: 管道是用 Python 定义的,可以动态、灵活地生成工作流程。
  • 可扩展性:Airflow 可与各种环境集成,可创建自定义操作符,并可根据需要执行特定代码。
  • 优雅:管道的编写方式简洁明了。
  • 可扩展:它的模块化架构使用消息队列来协调任意数量的工作者。

在实际应用中,气流可用于以下情况:

  • 数据导入: 协调将数据导入 Elasticsearch 等数据库的日常工作。
  • 日志监控:管理日志文件的收集和处理,然后在 Elasticsearch 中进行分析,以识别错误或异常。
  • 整合多个数据源:将来自不同系统(应用程序接口、数据库、文件)的信息整合到 Elasticsearch 的单层中,简化搜索和报告。

了解 Airflow 中的 DAG(有向无环图

在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种定义任务执行顺序的结构。DAG 的主要特点是

  • 由独立任务组成:每个任务代表一个工作单元,可独立执行。
  • 排序: 任务的执行顺序在 DAG 中明确定义。
  • 可重用性:DAG 设计为可重复执行,有利于流程自动化。

气流组件

Airflow 生态系统由多个组件组成,这些组件共同协调任务:

  • 调度器:负责调度 DAG 和发送任务供工作者执行。
  • 执行者:管理任务的执行,将任务分配给员工。
  • 网络服务器:提供与 DAG 和任务交互的图形界面。
  • Dags 文件夹:存放用 Python 编写的 DAG 的文件夹。
  • 元数据:作为工具存储库的数据库,由调度器和执行器用来存储执行状态。

Apache Airflow 和 Elasticsearch

我们将演示如何使用 Apache Airflow 和 Elasticsearch 来协调任务并在 Elasticsearch 中对结果进行索引。本演示的目的是创建一个任务流水线,以更新 Elasticsearch 索引中的记录。该索引包含一个电影数据库,用户可在其中对电影进行评分和分配等级。假设每天有数百个收视率,那么就有必要不断更新收视率记录。为此,将开发一个每日执行的 DAG,负责检索新的综合评级并更新索引中的记录。

在 DAG 流程中,我们将有一个获取评级的任务,然后是一个验证结果的任务。如果数据不存在,DAG 将被定向到故障任务。否则,数据将在 Elasticsearch 中编入索引。我们的目标是通过一种方法,利用负责计算分数的机制来检索评分,从而更新索引中电影的评分字段。

将 Apache Airflow 和 Elasticsearch 与 Docker 结合使用

为了创建容器化环境,我们将使用带有 Docker 的 Apache Airflow。请按照 "Running Airflow in Docker" 指南中的说明实际设置 Airflow。

至于 Elasticsearch,我将使用 Elastic Cloud 上的集群,但如果你愿意,也可以使用 Docker 配置 Elasticsearch。已经创建了一个包含电影目录的索引,索引中包含电影数据。这些电影的 "评分 "字段将被更新。

创建 DAG

通过 Docker 安装后,将创建一个文件夹结构,其中包括 dags 文件夹,我们必须将 DAG 文件放在这里,Airflow 才能识别它们。

在此之前,我们需要确保安装了必要的依赖项。以下是此项目的依赖项

我们将创建文件update_ratings_movies.py 并开始任务编码。

现在,让我们导入必要的库:

我们将使用ElasticsearchPythonHook,这是一个通过抽象连接和使用外部 API 来简化 Airflow 与 Elasticsearch 集群之间集成的组件。

接下来,我们定义 DAG,说明其主要参数:

  • dag_id:DAG 的名称。
  • start_date:DAG 启动时间。
  • schedule:定义周期(本例中为每日)。
  • doc_md文件:将被导入并显示在 Airflow 界面中的文件。

确定任务

现在,让我们来定义 DAG 的任务。第一个任务将负责检索电影分级数据。我们将使用PythonOperator,并将task_id 设置为'get_movie_ratings'python_callable 参数将调用负责获取评级的函数。

接下来,我们需要验证结果是否有效。为此,我们将使用一个带有BranchPythonOperator 的条件。task_id 将是'validate_result' ,而python_callable 将调用验证函数。op_args 参数将用于把上一个任务'get_movie_ratings' 的结果传递给验证函数。

如果验证成功,我们将从'get_movie_ratings' 任务中获取数据,并将其索引到 Elasticsearch 中。为此,我们将创建一个新任务'index_movie_ratings' ,它将使用PythonOperatorop_args 参数将把'get_movie_ratings' 任务的结果传递给索引函数。

如果验证显示失败,DAG 将继续执行失败通知任务。在本例中,我们只打印了一条信息,但在实际应用中,我们可以配置警报来通知故障。

最后,我们定义任务依赖关系,确保它们以正确的顺序执行:

下面是我们 DAG 的完整代码:

可视化 DAG 执行

在 Apache Airflow 界面中,我们可以直观地看到 DAG 的执行情况。只需转到"DAGs" 选项卡,找到您创建的 DAG。

下面,我们可以直观地看到任务的执行情况及其各自的状态。通过选择特定日期的执行情况,我们可以访问每个任务的日志。请注意,在index_movie_ratings 任务中,我们可以看到索引中的索引结果,而且索引已成功完成。

在其他选项卡中,可以获取有关任务和 DAG 的更多信息,帮助分析和解决潜在问题。

结论

在本文中,我们演示了如何将 Apache Airflow 与 Elasticsearch 集成以创建数据摄取解决方案。我们展示了如何配置 DAG,定义负责检索、验证和索引电影数据的任务,以及如何在 Airflow 界面监控和可视化这些任务的执行。

这种方法可以很容易地适应不同类型的数据和工作流,使 Airflow 成为在各种情况下协调数据管道的有用工具。

参考资料

阿帕奇气流

https://airflow.apache.org/

使用 Docker 安装 Apache Airflow

https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html

Elasticsearch Python 挂钩

https://airflow.apache.org/docs/apache-airflow-providers-elasticsearch/stable/hooks/elasticsearch_python_hook.html

Python 操作员

https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html

常见问题

什么是阿帕奇气流?

Apache Airflow 是一个用于创建、调度和监控工作流的平台。

什么是 Apache Airflow?

Apache Airflow 用于协调 ETL 流程、数据管道和其他复杂的工作流,具有灵活性和可扩展性。

Apache Airflow 的主要组件是什么?

Airflow 的主要组件包括调度程序、执行器、网络服务器、Dags 文件夹和元数据。

Apache Airflow 可以与 Elasticsearch 集成吗?

是的,Apache Airflow 可以与 Elasticsearch 集成,例如,您可以使用 Apache Airflow 在 Elasticsearch 中协调任务和索引结果。

什么是 Apache Airflow 中的 DAG?

在 Airflow 中,工作流由 DAG(有向无环图)表示。DAG 是一种定义任务执行顺序的结构。

相关内容

准备好打造最先进的搜索体验了吗?

足够先进的搜索不是一个人的努力就能实现的。Elasticsearch 由数据科学家、ML 操作员、工程师以及更多和您一样对搜索充满热情的人提供支持。让我们联系起来,共同打造神奇的搜索体验,让您获得想要的结果。

亲自试用