使用 Logstash 拆分数据并将数据发送到多个输出

Logstash 是一个开源的服务器端数据处理管道,可以采集和转换数据,并将数据发送到一个或多个输出。在本篇博文中,我将会通过一个例子演示如何使用 Logstash 采集来自多个股票市场的数据,并将各个不同股票市场的相应数据发送到不同的输出。要完成这一过程,需要执行下列步骤:

  1. 为来自股票市场输入流的每个文档创建副本。
  2. 筛选每个副本,以便仅包含对给定股票市场有效的字段。
  3. 向每个副本中添加元数据,以表明其包含的是哪个股票市场的数据。
  4. 评估每个文档中的元数据,以便指导文档生成正确的输出。

请注意在本篇博文中,我并未使用管道至管道通信(在 6.5 中推出了测试版),尽管这一功能也有可能实现此处所描述的部分功能。

示例输入文件

我们将包含股票市场基准值的 CSV 文件作为 Logstash 的输入。下面给出了几条 CSV 条目示例:

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

逗号分隔值表示 “time”(时间)和下列股票交易基准的值:“DAX”、“SMI”、“CAC” 和 “FTSE”。将上面几行复制并粘贴到一个名为 “stocks.csv” 的 CSV 文件中,以便将其用作示例 Logstash 管道中的输入。

示例 Logstash 管道

下面我们将会展示如何通过 Logstash 管道完成下列步骤:

  1. 从 CSV 文件中读取股票市场值,并作为 CSV 格式的输入。
  2. 将 CSV 输入的各行映射至一个 JSON 文档,其中 CSV 输入的各列会映射至下列 JSON 字段:“time”、“DAX”、“SMI”、“CAC” 和 “FTSE”。
  3. 将时间字段转换为 Unix 格式。
  4. 使用 clone filter plugin(克隆筛选插件)为每个文档创建两个副本(在原始文档基础之上创建这些副本)。克隆筛选插件会自动为每个文档副本添加一个新的 “type”(类型)字段,其中 “type” 对应的是克隆阵列中所给的名称。我们已将类型定义为 “clone_for_SMI” 或 “clone_for_FTSE”,而且每个克隆副本最终仅会包含 “SMI” 或 “FTSE” 股票市场的数据。
  5. 对于每个克隆副本:
    1. 使用 prune filter plugin(修剪筛选插件)删除特定股票市场白名单字段之外的所有字段。
    2. 为每个文档添加与通过克隆功能所添加 “type” 对应的元数据。这一点很有必要,因为我们使用修剪功能删除了通过克隆功能插入的 “type”,而在输出阶段中为了指导文档生成正确的输出,这一信息为必填项。
  6. 通过适用于 Logstash 的 Elasticsearch 输出插件将各个股票市场的文档写到不同的 Elasticsearch 输出中,该输出取决于我们在第 5 步中添加的元数据字段所定义的值。在下面为了简化代码,每个 Elasticsearch 输出会向一个本地 Elasticsearch 集群中的不同索引写入内容。如果需要将多个集群作为输出,那么您可以轻松编辑每个 Elasticsearch 输出声明,以指定不同的 Elasticsearch 主机

下面是一个执行上述步骤的 Logstash 管道(且在备注中添加了对应的步骤编号)。 将这一管道复制到一个名为 "clones.conf" 的文件中并执行:

## 第 1 步
input {
  file {
    # 务必编辑路径以便使用您的 stocks.csv 文件
    path => "${HOME}/stocks.csv"
    # 下列内容将会确保 Logstash 每次执行时 
    # 都会重新读取完整输入(进行故障排查时很有用)。
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
## 第 2 步
filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 
      'DAX' => 'float'
      'SMI' => 'float'
      'CAC' => 'float'
      'FTSE' => 'float'
    }
  }
## 第 3 步  
date {
    match => ['time', 'UNIX']
  }
## 第 4 步
  # 下面一行将会为每个文档 
  # 额外创建 2 个副本(算上 
  # 原始文档的话,共有 3 个)。
  # 每个副本中都会自动添加一个 "type" 字段, 
  # 此字段与阵列中所给的名称对应。
  clone {
    clones => ['clone_for_SMI', 'clone_for_FTSE']
  }
## 第 5 步
  if [type] == 'clone_for_SMI' {
    # 删除 "SMI" 之外的所有内容
    prune {
       whitelist_names => [ "SMI"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_SMI" } 
    }
  } 
  else if [type] == 'clone_for_FTSE' {
    prune {
       whitelist_names => [ "FTSE"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_FTSE" } 
    }
  } 
}
## 第 6 步
output {
  # 发送到 stdout 的下列输出仅用于故障排查目的, 
  # 您可将其删除
  stdout { 
    codec =>  rubydebug {
      metadata => true
    }
  }
  if [@metadata][type] == 'only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

测试 Logstash 管道

如要使用示例 CSV 数据对管道进行测试,您可以执行下列命令,可能需要对其进行编辑以确保使用的是您系统的正确路径。请注意,"config.reload.automatic" 部分为可选,但是其能允许您在无需重新启动 Logstash 的条件下重新加载 "clones.conf":

./logstash -f ./clones.conf --config.reload.automatic

一旦 Logstash 读取了 "stocks.csv" 文件并处理完毕,我们可以看到生成了三个索引,名称分别为 "smi_data"、"ftse_data" 和 "stocks_original"。

检查 SMI 索引

GET /smi_data/_search

这将会以下列结构显示文档。注意:在 "smi_data" 索引中仅显示了 “SMI” 数据。

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score":1,
        "_source": {
          "SMI":1688.5    
        }
      }

检查 FTSE 索引

GET /ftse_data/_search

这将会以下列结构显示文档。注意:在 "ftse_data" 索引内的文档中仅会显示 “FTSE” 字段。

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id":"AgRskWUBsYalOV9y9hL0",
        "_score":1,
        "_source": {
          "FTSE":2448.2
        }
      }

检查原始文档索引

GET /stocks_originals/_search

这将会以下列结构显示文档。注意:在 "stocks_original" 索引中会显示未进行筛选的原始文档版本。

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score":1,
        "_source": {
          "host":"Alexanders-MBP",
          "@timestamp":"2017-01-01T00:30:00.000Z",
          "SMI":1678.1,
          "@version":"1",
          "message":"1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC":1772.8,
          "DAX":1628.75,
          "time":"1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE":2443.6
        }
      }

结论

在本篇博文中,我针对 Logstash 演示了一个小型的功能子集合。具体而言,我通过一个例子演示了如何使用 Logstash 采集来自多个股票市场的数据,对其进行处理并将这些数据发送到不同的输出。如果您正在对 Logstash 和 Elastic Stack 进行测试并且在这一过程中遇到了问题,请随时在公共论坛上寻求我们的帮助。