工程

如何创建易于维护和再利用的 Logstash 管道

Logstash 是一个开源的数据处理管道,可以从一个或多个输入采集事件,对事件进行转换,并将每个事件发送到一个或多个输出。某些 Logstash 实施过程可能包括多行代码,并且可能处理来自多个输入源的事件。为了让此类实施过程更加易于维护,我在这里将会演示如何基于模块化组件创建管道来提高代码的再利用率。

设计初衷

Logstash 经常需要对来自多个输入源的的事件应用相同的逻辑子集合。这通常会通过下面两种方法之一来实现: 

  1. 在单一管道中处理来自不同输入源的事件,这样便可以轻松地对来自所有来源的全部事件应用相同的逻辑。在此类实施过程中,不仅有相同的逻辑,通常还有大量的条件型逻辑。所以,这一方法通常会导致 Logstash 实施过程十分复杂难懂。
  2. 处理来自每个唯一输入源的事件时,执行唯一的管道。这一方法要求将相同的功能复制和拷贝到每个管道中,所以很难维护代码中的相同部分。 

本篇博文中演示的技巧会将模块化管道组件存储在不同文件中,然后再将这些组件组合在一起以构建管道,所以该方法能够克服上述方法中的缺陷。此方法能够降低管道复杂程度,还能消除重复代码。

模块化管道的构建

Logstash 配置文件由输入、筛选和输出构成,这些部分均会由 Logstash 管道予以执行。  在更高级的设置中,通常会有一个 Logstash 实例来执行多个管道。默认情况下,当您在没有参数的条件下启动 Logstash 时,其会读取一个名为 pipelines.yml 的文件,并为指定的管道创建实例。 

Logstash 的输入、筛选和输出可以存储在多个文件中,用户可通过指定一个 glob 表达式来选择这些文件,将它们包括在一个管道中。与 glob 表达式匹配的文件将会按照字母顺序组合在一起。由于筛选的执行顺序通常十分重要,所以下面这一点会十分有帮助:在文件名称中包括一个数字标识符,以便确保文件会按照希望的顺序进行组合。

我们在下面会定义两个特有管道,它们均由多个模块化 Logstash 组件组合而成。我们将 Logstash 组件存储在下列文件中:

  • 输入声明:01_in.cfg, 02_in.cfg
  • 筛选声明:01_filter.cfg, 02_filter.cfg, 03_filter.cfg
  • 输出声明:01_out.cfg

然后,我们使用下面的 glob 表达式来在 pipelines.yml 中定义由所需组件构成的管道: 

- pipeline.id: my-pipeline_1
  path.config: "<path>/{01_in,01_filter,02_filter,01_out}.cfg"
- pipeline.id: my-pipeline_2
  path.config: "<path>/{02_in,02_filter,03_filter,01_out}.cfg"

在上面的管道配置中,文件 02_filter.cfg 在两个管道中都有出现,这向您展示了如何在单一文件中定义和维护两个管道中的相同代码,以及多个管道如何对相同部分予以执行。

测试管道

在这一部分,我们将会提供文件的具体示例,这些文件将会组合在一起成为上面 pipelines.yml 中所定义的唯一管道。然后我们会使用这些文件运行 Logstash,并展示生成的输出。 

配置文件

输入文件:01_in.cfg

此文件定义了一个输入,此输入是一个生成器。此生成器输入旨在对 Logstash 进行测试,在这个示例中,它将会生成一个事件。 

input { 
  generator { 
    lines => ["Generated line"] 
    count => 1 
  } 
}

输入文件:02_in.cfg

此文件定义了一个 Logstash 输入,此输入会听取 stdin(标准输入)。

input { 
  stdin {}
}

筛选文件:01_filter.cfg

filter { 
  mutate { 
    add_field => { "filter_name" => "Filter 01" } 
  } 
}

筛选文件:02_filter.cfg

filter { 
  mutate { 
    add_field => { "filter_name" => "Filter 02" } 
  } 
}

筛选文件:03_filter.cfg

filter { 
  mutate { 
    add_field => { "filter_name" => "Filter 03" } 
  } 
}

输出文件:01_out.cfg

output { 
  stdout { codec =>  "rubydebug" } 
}

执行管道

在无任何选项的情况下启动 Logstash,这将会执行我们在前面定义的 pipelines.yml 文件。使用如下代码运行 Logstash: 

./bin/logstash

由于管道 my-pipeline_1 正在执行生成器来模拟输入事件,我们应该可在 Logstash 初始化完毕后立即看到下列输出。从中可以看出,此管道已按照我们的预期执行了 01_filter.cfg02_filter.cfg 中的内容。

{ 
       "sequence" => 0,
           "host" => "alexandersmbp2.lan",
        "message" => "Generated line",
     "@timestamp" => 2020-02-05T22:10:09.495Z,
       "@version" => "1",
    "filter_name" => [ 
        [0] "Filter 01",
        [1] "Filter 02" 
    ] 
}

由于另一个名为 my-pipeline_2 的管道正在等待 stdin(标准输入)上的输入,所以我们看到该管道尚未处理任何事件。向 Logstash 运行所在的终端中输入一些内容,然后按下 Return 来为该管道创建一个事件。完成此操作后,您应会看到类似下面的内容: 

{ 
    "filter_name" => [ 
        [0] "Filter 02",
        [1] "Filter 03" 
    ],
           "host" => "alexandersmbp2.lan",
        "message" => "I’m testing my-pipeline_2",
     "@timestamp" => 2020-02-05T22:20:43.250Z,
       "@version" => "1" 
}

从上面可以看到,02_filter.cfg03_filter.cfg 中的逻辑已按照我们的预期得到了应用。  

执行顺序

请注意 Logstash 并不会关注 glob 表达式内文件的顺序。Logstash 仅会使用 glob 表达式来确定需要包括哪些文件,然后按照字母顺序对文件进行排序。也就是说,如果我们更改 my-pipeline_2 的定义,即在 glob 表达式中 03_filter.cfg 出现在 02_filter.cfg 之前,每个事件仍会先通过 02_filter.cfg 中的筛选,然后再通过 03_filter.cfg 中的筛选。

结论

通过使用 glob 表达式,能够允许借助模块化组件(以单独文档的形式进行保存)构成 Logstash 管道。此种方法能够提高代码的易维护性、再利用率和可读性。 

另外提醒一下,除了本篇博文讲述的技巧,您还应该考虑管道至管道通信,看看其能否提高 Logstash 部署过程的模块化程度。