使用 Apache Camel 将数据导入 Elasticsearch 的过程结合了搜索引擎的鲁棒性和集成框架的灵活性。在本文中,我们将探讨 Apache Camel 如何简化和优化 Elasticsearch 的数据摄取。为了说明这一功能,我们将实施一个入门应用程序,逐步演示如何配置和使用 Apache Camel 将数据发送到 Elasticsearch。
什么是 Apache Camel?
Apache Camel 是一个开源集成框架,可简化不同系统之间的连接,让开发人员专注于业务逻辑,而不必担心系统通信的复杂性。Camel 的核心概念是"routes,即" ,它定义了信息从原点到目的地的路径,可能包括转换、验证和过滤等中间步骤。
Apache Camel 架构

Camel 使用"组件" 来连接不同的系统和协议,如数据库和消息服务,并使用"端点" 来表示消息的入口和出口。这些概念提供了模块化和灵活的设计,使其更易于高效、可扩展地配置和管理复杂的集成。
使用 Elasticsearch 和 Apache Camel
我们将演示如何配置一个简单的 Java 应用程序,使用 Apache Camel 将数据摄取到 Elasticsearch 集群中。还将介绍使用 Apache Camel 中定义的路由在 Elasticsearch 中创建、更新和删除数据的过程。
1.添加依赖项
配置此集成的第一步是在项目的pom.xml 文件中添加必要的依赖项。这将包括 Apache Camel 和 Elasticsearch 库。我们将使用新的 Java API 客户端库,因此必须导入camel-elasticsearch 组件,且版本必须与camel-core 库相同。
如果要使用 Java 低级 Rest Client,则必须使用 Elasticsearch 低级 Rest Client 组件。
2.配置和运行 Camel 内核
配置的第一步是使用DefaultCamelContext 类创建一个新的 Camel 上下文,作为定义和执行路由的基础。接下来,我们配置 Elasticsearch 组件,它将允许 Apache Camel 与 Elasticsearch 集群交互。ESlasticsearchComponent 实例被配置为连接到localhost:9200 地址,这是本地 Elasticsearch 集群的默认地址。对于需要身份验证的环境设置,应阅读有关如何配置组件和启用基本身份验证的文档,即"配置组件和启用基本身份验证" 。
然后,该组件会被添加到 Camel 上下文中,使已定义的路由能够使用该组件在 Elasticsearch 中执行操作。
之后,路由会被添加到上下文中。我们将创建用于批量索引、更新和删除文档的路由。
3.配置 Camel 路由
数据索引
我们要配置的第一个路由是用于数据索引。我们将使用一个包含电影目录的 JSON 文件。路由将被配置为读取位于src/main/resources/movies.json 的文件,将 JSON 内容反序列化为 Java 对象,然后应用聚合策略将多条信息合并为一条,以便在 Elasticsearch 中进行批量操作。每条信息的大小配置为 500 条,也就是说,批量索引每次将索引 500 部影片。
批量路由 Elasticsearch 操作
批量文件将被发送到 Elasticsearch 的批量操作端点。这种方法确保了处理大量数据时的效率和速度。
数据更新
下一步将是更新文件。在上一步中,我们为一些电影编制了索引,现在我们将创建新的路径,通过参考代码搜索文档,然后更新评级字段。
我们建立了一个 Camel 上下文(DefaultCamelContext) ,在其中注册了一个 Elasticsearch 组件,并添加了一个自定义路由 IngestionRoute。操作开始时,先通过 ProducerTemplate 发送文档代码,然后从 direct:update-ingestion 端点启动路由。
接下来是 IngestionRoute,它是该流程的输入端点。路由执行多个流水线操作。首先,在 Elasticsearch 中进行搜索,按代码查找文件(direct:search-by-id) ,其中 SearchByCodeProcessor 根据代码组合查询。然后,UpdateRatingProcessor 对检索到的文档进行处理,将结果转换为电影对象,将电影分级更新为特定值,并准备将更新后的文档发回 Elasticsearch 进行更新。
SearchByCodeProcessor 处理器的配置仅用于执行搜索查询:
UpdateRatingProcessor 处理器负责更新评级字段。
数据删除
最后,配置删除文件的路径。在这里,我们将使用文档 ID 删除文档。在 Elasticsearch 中,要删除文档,我们需要知道文档标识符、存储该文档的索引并执行删除请求。在 Apache Camel 中,我们将通过创建一个新路由来执行此操作,如下图所示。
路由从 direct:op-delete 端点开始,它是入口点。需要删除文件时,会在邮件正文中收到其标识符(_id) 。然后,路由使用简单的("${body}") ,从报文正文中提取_id,用该标识符的值设置 indexId 头。
最后,消息会被定向到 URI_DELETE_OPERATION 指定的端点,该端点会连接到 Elasticsearch,以便在相应索引中执行文档删除操作。现在我们已经创建了路由,可以创建一个 Camel 上下文(DefaultCamelContext) ,该上下文已配置为包含 Elasticsearch 组件。
接下来,由OperationDeleteRoute 类定义的删除路由被添加到上下文中。初始化上下文后,ProducerTemplate ,将应删除文档的标识符传递给direct:op-delete 端点,从而触发删除路由。
结论
Apache Camel 和 Elasticsearch 之间的集成可实现稳健高效的数据摄取,利用 Camel 的灵活性定义路由,从而处理不同的数据操作场景,如索引、更新和删除。通过这种设置,您可以以可扩展的方式协调和自动化复杂的流程,确保您的数据在 Elasticsearch 中得到有效管理。该示例演示了如何将这些工具结合使用,以创建高效、适应性强的数据摄取解决方案。
参考资料
常见问题
什么是 Apache Camel?
Apache Camel 是一个开源集成框架,可简化不同系统之间的连接,让开发人员专注于业务逻辑,而不必担心系统通信的复杂性。




