Logstash 是一个数据处理管道,可从多个来源摄取数据,对其进行转换,然后发送到您选择的目的地。过滤器插件是这一过程的关键;当数据通过管道时,它们会对数据执行特定操作。
Logstash 包含多个内置过滤器,可用于解析、丰富和修改数据等常见任务。但有时,您会遇到需要自定义逻辑的情况,这超出了这些标准过滤器所能提供的范围。这就是Ruby 过滤器插件的用武之地。
Ruby 过滤器插件允许您在 Logstash 管道中直接执行自定义 Ruby 代码。如果标准过滤器还不够用,Ruby 过滤器还能让您处理复杂的数据转换、实施自定义业务逻辑或与外部系统集成。
在本博客中,我们将探讨如何从基础到高级使用 Ruby 过滤器。
何时使用 Ruby 过滤器?
作为 Elastic 的咨询架构师,我经常看到客户在数据处理管道中使用 Logstash,尽管现在它还不是最先进的数据处理引擎。当涉及到复杂的数据操作或自定义逻辑时,他们往往会因为标准过滤器的局限性而苦恼。在这种情况下,Ruby 过滤器可以帮助克服这些挑战。
当标准 Logstash 过滤器无法满足您的特定要求时,Ruby 过滤器就会派上用场。下面是一些常见的使用案例:
- 深度嵌套数据操作:修改复杂的 JSON 结构、数组中的数组或根据内容动态重组数据
- 高级字符串处理:从非结构化文本中解析和提取结构化数据
- 实施复杂的业务逻辑:创建需要条件逻辑、循环或复杂计算的自定义转换
基本用法
让我们从一个简单的例子开始,了解 Ruby 过滤器是如何工作的。
配置 Ruby 过滤器
创建 Logstash 管道时,应将配置文件放在/etc/logstash/conf.d 目录中。或者,你也可以使用-f 选项,在手动启动 Logstash 时指定配置文件的路径,这样就可以轻松试验你的管道了。
配置文件的扩展名应为.conf 。
要使用 Ruby 过滤器,请在 Logstash 管道配置 (*.conf) 文件的过滤器部分定义ruby 过滤器。下面是一个基本例子:
此内联 Ruby 过滤器在 Logstash 配置中定义了一个 Ruby 过滤器实例。code 参数提供了 Logstash 将为该过滤器处理的每个事件执行的内联 Ruby 脚本。在该脚本中,有一个event 变量可用于表示事件本身。事件对象包含发送到 Logstash 的原始数据以及在 Logstash 过滤阶段创建的任何附加字段。您可以通过 Logstash 事件 API 访问这些字段,如event.get() 和event.set() 。在此示例代码中,event.set('new_field', 'Hello from Ruby!') 将名为new_field 的新字段设置为字符串值Hello from Ruby! 。您可以根据需要在code 块中添加任何其他代码。
请注意,这个event 对象虽然是键值类型的数据容器,但它并不是通常的 Ruby 哈希对象。查看此官方文档,了解有关事件 API 的更多信息。
外部化 Ruby 脚本
对于简单的转换,内联 Ruby 代码非常方便。但是,对于复杂的逻辑或可重复使用的功能,建议将代码移到外部 Ruby 脚本中。这样可以提高可维护性,并保持 Logstash 管道配置的整洁。
首先,创建 Ruby 脚本并将其保存为my_ruby_script.rb 。脚本必须定义一个处理事件的filter 方法。它将一个事件对象作为参数,该对象代表正在处理的当前事件。filter 方法需要返回一个要发射的事件数组。要删除事件,返回空数组。
例如,下面的脚本读取message 字段,计算其长度,并将结果存储到名为message_length 的新字段中。
接下来,使用path 选项设置 Ruby 过滤器配置,以引用脚本。这会告诉 Logstash 加载并执行外部脚本。使用外部脚本时,请确保文件存在并具有正确的权限。
现在,每个事件都会被传递到my_ruby_script.rb 中的filter 方法,并由它进行处理。
这种方法能帮助你更有效地管理复杂的逻辑,使测试、调试和重用 Ruby 代码变得更容易。
高级用法
在本节中,我们将探讨在 Logstash 中使用 Ruby 过滤器的一些高级示例。这些示例将演示如何使用 Ruby 执行数据转换、丰富事件和实现自定义逻辑。
操作嵌套数据结构
Logstash 事件是 Logstash 处理的核心数据结构。它可以包含各种字段,包括嵌套数据结构,如数组和哈希值。通过 Ruby 过滤器,您可以轻松处理这些嵌套结构。
Ruby 过滤器可以处理嵌套数据结构,如哈希值和数组,允许你修改或添加这些结构中的字段。这在处理 JSON 等复杂数据格式时非常有用。
此示例在输入数据中包含一个嵌套 JSON 对象。Ruby 过滤器通过添加新的键值对来修改嵌套数据。标准 Logstash 过滤器无法对嵌套数据进行此类操作,因此 Ruby 过滤器成为复杂数据结构的便捷选择。
将单个事件拆分为多个事件
Ruby 过滤器还可用于将单个事件分割成多个事件。当您有一个包含项目数组的单个事件,并希望为每个项目创建单独的事件时,这个功能就非常有用。
请注意,Elasticsearch 的摄取管道和 Beats/Elastic Agent 的处理器都不支持拆分事件。这是 Logstash 最强大的用例之一。
带分体式过滤器
您可以使用split 过滤器,根据指定字段将一个事件拆分成多个事件。不过,如果需要在分割过程中执行额外的转换或逻辑,可以将 Ruby 过滤器与分割过滤器结合使用。
在下面的示例中,我们将 RSS 订阅作为一行 XML 文本。它包含多个<item> 元素。Ruby 过滤器用于从 XML 中提取<item> 元素,并将其存储在名为items 的新字段中。然后,根据items 字段,使用拆分过滤器将事件拆分成多个事件。
输出结果如下
您可能已经注意到,在这种情况下,ruby 过滤器并不是必不可少的。split 过滤器可用于根据items 字段将事件拆分成多个事件,而mutate 过滤器可用于删除不必要的字段。不过,如果需要在分割过程中执行额外的转换或逻辑,可以使用 Ruby 过滤器。
使用内联 Ruby 脚本
您还可以使用内联 Ruby 脚本,通过event.clone 方法和new_event_block variable ,将单个事件拆分成多个事件,如new_event_block.call(new_event) 。这样,您就可以根据原始事件创建新事件,同时保留其数据。
下面是一个如何使用 Ruby 过滤器将单个事件分割成多个事件的示例。输入和输出与上一个示例相同。
使用外部 Ruby 脚本
您还可以使用外部 Ruby 脚本将单个事件分割成多个事件。
配置文件:
Ruby 脚本需要外部化为split_event.rb :
请记住,filter 方法必须返回一个事件数组。您可以通过克隆传入的事件对象并将其添加到数组中来返回多个事件,也可以将单个事件作为包含一个元素的数组来返回。
这样就可以将单个事件拆分成多个事件。
执行外部命令并解析其输出结果
Logstash exec 输入插件允许您执行外部命令,其输出将成为 Logstash 的事件。命令的输出将存储在事件的message 字段中。
通常情况下,系统命令的输出是人类可读的,但并不是 Logstash 可以轻松解析的 JSON 或其他格式的结构。要处理这种情况,可以使用 Ruby 过滤器解析输出并从中提取信息。
下面是一个使用exec 输入插件执行ps -ef 命令的示例,该命令可列出类 Unix 系统上所有正在运行的进程。Ruby 过滤器将对输出进行解析,以提取每个进程的相关信息。
本例使用exec 输入插件,每 60 秒运行一次ps -ef 命令。Ruby 过滤器处理输出,提取相关字段,如 UID、PID、PPID、CPU 使用率 (C)、开始时间 (STIME)、TTY、CPU 总时间 (TIME) 和执行的命令 (CMD)。它在我的 macOS 环境中运行良好,但你可能需要调整 regex 模式,以匹配你系统中ps -ef 命令的输出格式。
使用内置库
Ruby 过滤器插件允许您使用内置的 Ruby 库,这对各种任务都非常有用。例如,您可以使用json 库解析 JSON 字符串,或使用date 库处理日期。
下面是一个使用json 库解析存储在字段中的 JSON 字符串的示例:
为了避免每次都需要使用库,您应该将 Ruby 代码外部化,以便在 Ruby 过滤器脚本的开头使用require 语句。这将加载一次库,使其可在脚本中使用。
要检查环境中哪些库可用,可以在 Ruby 过滤器中运行以下代码,列出内置库:
注意: Logstash 官方不支持内置库,它们的行为可能会发生变化,或者在未来版本中不可用。使用风险自负。
结论
Logstash Ruby 过滤器允许您自定义和扩展 Logstash 管道的功能。在本篇文章中,我们介绍了使用 Ruby 过滤器的基础知识,并提供了高级用法示例。
利用 Ruby 过滤器,您可以处理需要自定义逻辑或高级操作的复杂数据处理任务。无论您是处理嵌套数据结构、拆分事件,还是解析复杂/非结构化文本并将其转换为结构化 JSON,Ruby 过滤器都能灵活满足您的特定需求。
希望本指南能为您提供探索 Logstash Ruby 过滤器全部潜力的知识和灵感。祝您编写脚本愉快!




