Uso do Logstash para dividir dados e enviá-los para várias saídas

O Logstash é um pipeline de processamento de dados open source do lado do servidor que ingere dados, os transforma e os envia para uma ou mais saídas. Neste blog, apresentarei um exemplo que mostra como usar o Logstash para ingerir dados de vários mercados de ações e enviar os dados correspondentes a cada mercado de ações para uma saída distinta. Isso é realizado executando as seguintes etapas:

  1. Criar cópias de cada documento a partir de um fluxo de entrada do mercado de ações.
  2. Filtrar cada cópia para conter apenas campos válidos para um determinado mercado de ações
  3. Adicionar metadados a cada cópia para indicar a qual mercado de ações pertence os dados nela contidos.
  4. Avaliar os metadados em cada documento para direcionar o documento para a saída correta.

Observe que, neste post do blog, eu não utilizo a comunicação pipeline a pipeline (beta a partir da versão 6.5), que provavelmente também poderia alcançar algumas das funcionalidades descritas aqui.

Exemplo de arquivo de entrada

Como entrada para o Logstash, usamos um arquivo CSV que contém valores de referência do mercado de ações. Alguns exemplos de entradas do arquivo CSV são fornecidos abaixo:

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

Os valores separados por vírgula representam "hora" ("time") e o valor dos seguintes valores de referência das bolsas de valores: "DAX", "SMI", "CAC" e "FTSE". Copie e cole as linhas acima em um arquivo CSV chamado "stocks.csv" para usar esses dados como uma entrada no exemplo do pipeline do Logstash.

Exemplo de pipeline do Logstash

Abaixo, apresentaremos um pipeline do Logstash que faz o seguinte:

  1. Ler os valores do mercado de ações como entrada de um arquivo no formato CSV.
  2. Mapear cada linha da entrada do arquivo CSV para um documento JSON, onde as colunas do CSV são mapeadas para os seguintes campos JSON: "time", "DAX", "SMI", "CAC" e "FTSE".
  3. Converter o campo de hora no formato Unix.
  4. Usar o plugin de filtro de clone para criar duas cópias de cada documento (essas cópias são adicionais ao documento original). O filtro de clone adiciona automaticamente um novo campo "type" a cada nova cópia do documento, onde "type" corresponde aos nomes fornecidos na matriz de clones. Definimos os tipos como "clone_for_SMI" ou "clone_for_FTSE", e, no fim, cada clone conterá apenas dados para o mercado de ações "SMI" ou "FTSE".
  5. Para cada clone:
    1. Usar o plugin de filtro de remoção para remover todos os campos, exceto aqueles que estiverem na lista de permissões para o mercado de ações específico.
    2. Adicionar metadados a cada documento correspondente ao tipo ("type") que foi adicionado pela função de clone. Isso é necessário porque estamos usando a função de remoção que remove o "tipo" que foi inserido pela função de clone, e essas informações são necessárias no estágio de saída para direcionar o documento para a saída correta.
  6. Usar o plugin de saída do Elasticsearch para Logstash para gravar os documentos de cada mercado de ações em uma saída diferente do Elasticsearch, com a saída determinada pelo valor definido no campo de metadados que adicionamos na Etapa 5. Para simplificar o código abaixo, cada saída do Elasticsearch é gravada em um índice exclusivo em um cluster local do Elasticsearch. Se vários clusters devem ser usados como saídas, cada declaração de saída do Elasticsearch pode ser facilmente modificada para especificar hosts do Elasticsearch exclusivos.

Veja abaixo um pipeline do Logstash que executa as etapas acima (com os números das etapas correspondentes adicionados como comentários). Copie esse pipeline em um arquivo chamado "clones.conf" para execução:

## ETAPA 1
input {
  file {
    # lembre-se de editar o caminho para usar seu arquivo stocks.csv
    path => "${HOME}/stocks.csv"
    # O código a seguir assegurará a releitura da entrada completa 
    # cada vez que o Logstash for executado (útil para depuração).
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
## ETAPA 2
filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 
      'DAX' => 'float'
      'SMI' => 'float'
      'CAC' => 'float'
      'FTSE' => 'float'
    }
  }
## ETAPA 3  
date {
    match => ['time', 'UNIX']
  }
## ETAPA 4
  # A linha a seguir criará duas cópias adicionais 
  # de cada documento (ou seja, incluindo o 
  # original, três no total). 
  # Cada cópia terá automaticamente um campo "type" adicionado 
  # correspondente ao nome fornecido na matriz.
  clone {
    clones => ['clone_for_SMI', 'clone_for_FTSE']
  }
## ETAPA 5
  if [type] == 'clone_for_SMI' {
    # Remova tudo, exceto "SMI"
    prune {
       whitelist_names => [ "SMI"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_SMI" } 
    }
  } 
  else if [type] == 'clone_for_FTSE' {
    prune {
       whitelist_names => [ "FTSE"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_FTSE" } 
    }
  } 
}
## ETAPA 6
output {
  # A saída a seguir para stdout é apenas para depuração 
  # e pode ser removida
  stdout { 
    codec =>  rubydebug {
      metadata => true
    }
  }
  if [@metadata][type] == 'only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Teste do pipeline do Logstash

Para testar esse pipeline com os dados do CSV de exemplo, você pode executar o comando a seguir, modificando-o para garantir o uso de caminhos corretos para o seu sistema. Observe que a especificação de "config.reload.automatic" é opcional, mas nos permite recarregar "clones.conf" automaticamente sem reiniciar o Logstash:

./logstash -f ./clones.conf --config.reload.automatic

Depois que o Logstash tiver lido o arquivo "stocks.csv" e concluído o processamento, poderemos visualizar os três índices resultantes chamados "smi_data", "ftse_data" e "stocks_original".

Verificar o índice de SMI

GET /smi_data/_search

Isso exibirá documentos com a estrutura a seguir. Observe que apenas os dados de "SMI" aparecem no índice "smi_data".

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5    
        }
      }

Verificar o índice de FTSE

GET /ftse_data/_search

Isso exibirá documentos com a estrutura a seguir. Observe que apenas o campo "FTSE" aparece nos documentos do índice "ftse_data".

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

Verificar o índice dos documentos originais

GET /stocks_originals/_search

Isso exibirá documentos com a estrutura a seguir. Observe que a versão original não filtrada dos documentos aparece no índice "stocks_original".

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

Conclusão

Neste post do blog, demonstrei um pequeno subconjunto dos recursos do Logstash. Especificamente, apresentei um exemplo que mostrava como usar o Logstash para ingerir dados de várias bolsas de valores e depois processar e direcionar esses dados para saídas distintas. Se você estiver testando o Logstash e o Elastic Stack e tiver alguma dúvida, fique à vontade para procurar ajuda em nossos fóruns de discussão públicos.