Ingeniería

Cómo usar Logstash para dividir datos y enviarlos a múltiples salidas

Logstash es un pipeline de procesamiento de datos de open source del lado del servidor que ingesta datos, los transforma y luego los envía a una o más salidas. En este blog, presentaremos un ejemplo que muestra cómo usar Logstash para ingestar datos de múltiples mercados de valores y enviar los datos que corresponden a cada mercado de valores único a una salida diferente. Esto se logra siguiendo estos pasos:

  1. Crea copias de cada documento de un flujo de entrada del mercado de valores.
  2. Filtra cada copia para ver solo campos que sean válidos para un mercado de valores determinado.
  3. Agrega metadatos a cada copia para indicar qué datos del mercado de valores contiene.
  4. Evalúa los metadatos en cada documento para dirigir el documento hacia la salida correcta.

Ten en cuenta que en este blog, no usamos la comunicación de pipeline a pipeline (beta desde la versión 6.5), con la que sería posible lograr parte de la funcionalidad descrita aquí.

Archivo de entrada de ejemplo

Como entrada a Logstash, usamos un archivo CSV que contiene valores de referencia del mercado de valores. A continuación, se detallan algunos ejemplos de entradas CSV:

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

Los valores separados por comas representan "tiempo" y el valor de las siguientes referencias bursátiles: "DAX", "SMI", "CAC" y "FTSE". Copia y pega las líneas anteriores en un archivo CSV llamado "stocks.cvs" para usarlo como entrada en el pipeline de Logstash de ejemplo.

Pipeline de Logstash de ejemplo

A continuación, presentamos un pipeline de Logstash que realiza lo siguiente:

  1. Lee los valores del mercado como entrada con formato CSV desde un archivo CSV.
  2. Mapea cada fila de la entrada CSV a un documento JSON, donde las columnas CSV mapean los siguientes campos JSON: "time", "DAX","SMI", "CAC" y "FTSE".
  3. Convierte el campo de hora a formato Unix.
  4. Usa el plugin de filtro de clones para crear dos copias de cada documento (estas copias son adicionales al documento original). El filtro de clones agrega automáticamente un nuevo campo "type" a cada nueva copia del documento, en el que "type" corresponde a los nombres dados en la matriz de clones. Hemos definido los tipos como "clone_for_SMI" o "clone_for_FTSE", y cada clon al final solo contendrá datos de los mercados de valores "SMI" o "FTSE".
  5. Para cada clon:
    1. Usa el plugin prune filter para quitar todos los campos, salvo aquellos en la lista blanca del mercado de valores específico.
    2. Agrega metadatos a cada documento que se correspondan con el "tipo" que se ha agregado con la función clonar. Esto es necesario porque usamos la función eliminar, que quita el "tipo" que se insertó con la función clonar, y esta información es necesaria en la etapa de salida para direccionar el documento a la salida correcta.
  6. Usa el plugin de salida de Elasticsearch para Logstash para escribir los documentos para cada mercado de valores de manera que vayan a una salida de Elasticsearch diferente, con la salida determinada mediante el valor definido en el campo de metadatos que agregamos en el paso 5. Para simplificar el siguiente código, cada salida de Elasticsearch escribe a un índice único en un cluster local de Elasticsearch. Si se deben usar múltiples clusters como salidas, cada declaración de salida de Elasticsearch podrá modificarse fácilmente para especificar hosts de Elasticsearch únicos.

A continuación, detallamos un pipeline de Logstash que ejecuta los pasos anteriores (con los números de paso correspondientes detallados como comentarios). Copia este pipeline en un archivo llamado "clones.conf" para ejecutarlo:

## PASO 1
input {
  file {
    # asegúrate de editar la ruta para usar el archivo stocks.csv
    path => "${HOME}/stocks.csv"
    # Lo siguiente asegurará la relectura de la entrada completa 
    # cada vez que se ejecute Logstash (útil para depuración).
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
## PASO 2
filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 
      'DAX' => 'float'
      'SMI' => 'float'
      'CAC' => 'float'
      'FTSE' => 'float'
    }
  }
## PASO 3  
date {
    match => ['time', 'UNIX']
  }
## PASO 4
  # La siguiente línea creará 2 copias 
  # adicionales de cada documento (es decir, incluido 
  # el original, 3 en total). 
  # Se agregará automáticamente a cada copia un campo "type" 
  # correspondiente al nombre dado en la matriz.
  clone {
    clones => ['clone_for_SMI', 'clone_for_FTSE']
  }
## PASO 5
  if [type] == 'clone_for_SMI' {
    # Elimina todo, excepto "SMI"
    prune {
       whitelist_names => [ "SMI"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_SMI" } 
    }
  } 
  else if [type] == 'clone_for_FTSE' {
    prune {
       whitelist_names => [ "FTSE"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_FTSE" } 
    }
  } 
}
## PASO 6
output {
  # La siguiente salida a stdout es solo para depuración 
  # y puede eliminarse
  stdout { 
    codec =>  rubydebug {
      metadata => true
    }
  }
  if [@metadata][type] == 'only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Prueba del pipeline de Logstash

Para probar este pipeline con los datos CSV de ejemplo, podrías ejecutar el siguiente comando, modificándolo para asegurarte de que estás usando rutas correctas para tu sistema. Ten en cuenta que especificar "config.reload.automatic" es opcional, pero nos permite recargar automáticamente "clones.conf" sin reiniciar Logstash:

./logstash -f ./clones.conf --config.reload.automatic

Una vez que Logstash ha leído el archivo "stocks.csv" y completado el procesamiento, podemos ver los tres índices resultantes llamados "smi_data", "ftse_data" y "stocks_original".

Comprueba el índice SMI

GET /smi_data/_search

Esto mostrará los documentos con la siguiente estructura. Ten en cuenta que solo aparecen datos "SMI" en el índice "smi_data".

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5    
        }
      }

Comprueba el índice FTSE

GET /ftse_data/_search

Esto mostrará los documentos con la siguiente estructura. Ten en cuenta que solo el campo "FTSE" aparece en los documentos en el índice "ftse_data".

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

Comprueba el índice de documentos original

GET /stocks_originals/_search

Esto mostrará los documentos con la siguiente estructura. Ten en cuenta que la versión original sin filtro de los documentos aparece en el índice "stocks_original".

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

Conclusión

En este blog, hemos demostrado un subconjunto pequeño de las capacidades de Logstash. Específicamente, presentamos un ejemplo que muestra cómo usar Logstash para ingestar datos desde múltiples mercados de valores, y después procesarlos y redirigir esos datos a salidas diferentes. Si estás probando Logstash y el Elastic Stack y tienes preguntas, no dudes en buscar ayuda en nuestros foros de debate públicos.