使用 Logstash 拆分数据并将数据发送到多个输出
Logstash 是一个开源的服务器端数据处理管道,可以采集和转换数据,并将数据发送到一个或多个输出。在本篇博文中,我将会通过一个例子演示如何使用 Logstash 采集来自多个股票市场的数据,并将各个不同股票市场的相应数据发送到不同的输出。要完成这一过程,需要执行下列步骤:
- 为来自股票市场输入流的每个文档创建副本。
- 筛选每个副本,以便仅包含对给定股票市场有效的字段。
- 向每个副本中添加元数据,以表明其包含的是哪个股票市场的数据。
- 评估每个文档中的元数据,以便指导文档生成正确的输出。
请注意在本篇博文中,我并未使用管道至管道通信(在 6.5 中推出了测试版),尽管这一功能也有可能实现此处所描述的部分功能。
示例输入文件
我们将包含股票市场基准值的 CSV 文件作为 Logstash 的输入。下面给出了几条 CSV 条目示例:
1483230600,1628.75,1678.1,1772.8,2443.6 1483232400,1613.63,1688.5,1750.5,2460.2 1483234200,1606.51,1678.6,1718,2448.2 1483236000,1621.04,1684.1,1708.1,2470.4
逗号分隔值表示 “time”(时间)和下列股票交易基准的值:“DAX”、“SMI”、“CAC” 和 “FTSE”。将上面几行复制并粘贴到一个名为 “stocks.csv” 的 CSV 文件中,以便将其用作示例 Logstash 管道中的输入。
示例 Logstash 管道
下面我们将会展示如何通过 Logstash 管道完成下列步骤:
- 从 CSV 文件中读取股票市场值,并作为 CSV 格式的输入。
- 将 CSV 输入的各行映射至一个 JSON 文档,其中 CSV 输入的各列会映射至下列 JSON 字段:“time”、“DAX”、“SMI”、“CAC” 和 “FTSE”。
- 将时间字段转换为 Unix 格式。
- 使用 clone filter plugin(克隆筛选插件)为每个文档创建两个副本(在原始文档基础之上创建这些副本)。克隆筛选插件会自动为每个文档副本添加一个新的 “type”(类型)字段,其中 “type” 对应的是克隆阵列中所给的名称。我们已将类型定义为 “clone_for_SMI” 或 “clone_for_FTSE”,而且每个克隆副本最终仅会包含 “SMI” 或 “FTSE” 股票市场的数据。
- 对于每个克隆副本:
- 使用 prune filter plugin(修剪筛选插件)删除特定股票市场白名单字段之外的所有字段。
- 为每个文档添加与通过克隆功能所添加 “type” 对应的元数据。这一点很有必要,因为我们使用修剪功能删除了通过克隆功能插入的 “type”,而在输出阶段中为了指导文档生成正确的输出,这一信息为必填项。
- 通过适用于 Logstash 的 Elasticsearch 输出插件将各个股票市场的文档写到不同的 Elasticsearch 输出中,该输出取决于我们在第 5 步中添加的元数据字段所定义的值。在下面为了简化代码,每个 Elasticsearch 输出会向一个本地 Elasticsearch 集群中的不同索引写入内容。如果需要将多个集群作为输出,那么您可以轻松编辑每个 Elasticsearch 输出声明,以指定不同的 Elasticsearch 主机。
下面是一个执行上述步骤的 Logstash 管道(且在备注中添加了对应的步骤编号)。 将这一管道复制到一个名为 "clones.conf" 的文件中并执行:
## 第 1 步 input { file { # 务必编辑路径以便使用您的 stocks.csv 文件 path => "${HOME}/stocks.csv" # 下列内容将会确保 Logstash 每次执行时 # 都会重新读取完整输入(进行故障排查时很有用)。 start_position => "beginning" sincedb_path => "/dev/null" } } ## 第 2 步 filter { csv { columns => ["time","DAX","SMI","CAC","FTSE"] separator => "," convert => { 'DAX' => 'float' 'SMI' => 'float' 'CAC' => 'float' 'FTSE' => 'float' } } ## 第 3 步 date { match => ['time', 'UNIX'] } ## 第 4 步 # 下面一行将会为每个文档 # 额外创建 2 个副本(算上 # 原始文档的话,共有 3 个)。 # 每个副本中都会自动添加一个 "type" 字段, # 此字段与阵列中所给的名称对应。 clone { clones => ['clone_for_SMI', 'clone_for_FTSE'] } ## 第 5 步 if [type] == 'clone_for_SMI' { # 删除 "SMI" 之外的所有内容 prune { whitelist_names => [ "SMI"] } mutate { add_field => { "[@metadata][type]" => "only_SMI" } } } else if [type] == 'clone_for_FTSE' { prune { whitelist_names => [ "FTSE"] } mutate { add_field => { "[@metadata][type]" => "only_FTSE" } } } } ## 第 6 步 output { # 发送到 stdout 的下列输出仅用于故障排查目的, # 您可将其删除 stdout { codec => rubydebug { metadata => true } } if [@metadata][type] == 'only_SMI' { elasticsearch { index => "smi_data" } } else if [@metadata][type] == 'only_FTSE' { elasticsearch { index => "ftse_data" } } else { elasticsearch { index => "stocks_original" } } }
测试 Logstash 管道
如要使用示例 CSV 数据对管道进行测试,您可以执行下列命令,可能需要对其进行编辑以确保使用的是您系统的正确路径。请注意,"config.reload.automatic" 部分为可选,但是其能允许您在无需重新启动 Logstash 的条件下重新加载 "clones.conf":
./logstash -f ./clones.conf --config.reload.automatic
一旦 Logstash 读取了 "stocks.csv" 文件并处理完毕,我们可以看到生成了三个索引,名称分别为 "smi_data"、"ftse_data" 和 "stocks_original"。
检查 SMI 索引
GET /smi_data/_search
这将会以下列结构显示文档。注意:在 "smi_data" 索引中仅显示了 “SMI” 数据。
{ "_index": "smi_data", "_type": "doc", "_id": "_QRskWUBsYalOV9y9hGJ", "_score":1, "_source": { "SMI":1688.5 } }
检查 FTSE 索引
GET /ftse_data/_search
这将会以下列结构显示文档。注意:在 "ftse_data" 索引内的文档中仅会显示 “FTSE” 字段。
{ "_index": "ftse_data", "_type": "doc", "_id":"AgRskWUBsYalOV9y9hL0", "_score":1, "_source": { "FTSE":2448.2 } }
检查原始文档索引
GET /stocks_originals/_search
这将会以下列结构显示文档。注意:在 "stocks_original" 索引中会显示未进行筛选的原始文档版本。
{ "_index": "stocks_original", "_type": "doc", "_id": "-QRskWUBsYalOV9y9hFo", "_score":1, "_source": { "host":"Alexanders-MBP", "@timestamp":"2017-01-01T00:30:00.000Z", "SMI":1678.1, "@version":"1", "message":"1483230600,1628.75,1678.1,1772.8,2443.6", "CAC":1772.8, "DAX":1628.75, "time":"1483230600", "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv", "FTSE":2443.6 } }
结论
在本篇博文中,我针对 Logstash 演示了一个小型的功能子集合。具体而言,我通过一个例子演示了如何使用 Logstash 采集来自多个股票市场的数据,对其进行处理并将这些数据发送到不同的输出。如果您正在对 Logstash 和 Elastic Stack 进行测试并且在这一过程中遇到了问题,请随时在公共论坛上寻求我们的帮助。