Tech Topics

Using Logstash to Split Data and Send it to Multiple Outputs

Logstash is an open source, server-side data processing pipeline that ingests data, transforms it, and then sends it to one or more outputs. In this blog, I will present an example that shows how to use Logstash to ingest data from multiple stock markets and to send the data corresponding to each unique stock market to a distinct output. This is accomplished by executing the following steps:

  1. Create copies of each document from a stock market input stream.
  2. Filter each copy to only contain fields that are valid for a given stock market
  3. Add metadata to each copy to indicate which stock market’s data it contains.
  4. Evaluate the metadata in each document to direct the document to the correct output.

Note that in this blog post, I do not make use of pipeline-to-pipeline communication (beta as of 6.5) which could also likely achieve some of the functionality described here.

Example input file

As an input to Logstash, we use a CSV file that contains stock market benchmark values. A few example CSV entries are given below:

1483230600,1628.75,1678.1,1772.8,2443.6
1483232400,1613.63,1688.5,1750.5,2460.2
1483234200,1606.51,1678.6,1718,2448.2
1483236000,1621.04,1684.1,1708.1,2470.4

The comma separated values represent  “time” and the value of the following stock exchange benchmarks: “DAX”, “SMI”, “CAC”, and “FTSE” . Copy and paste the above lines into a CSV file called “stocks.csv” in order to use this as an input in the example Logstash pipeline.

Example Logstash pipeline

Below we will present a Logstash pipeline that does the following:

  1. Read stock market values as CSV-formatted input from a CSV file.
  2. Map each row of the CSV input to a JSON document, where the CSV columns map to the following JSON fields:  “time”, “DAX”, “SMI”, “CAC”, and “FTSE”.
  3. Convert the time field to Unix format.
  4. Use the clone filter plugin to create two copies of each document (these copies are in addition to the original document). The clone filter automatically adds a new “type” field to each new document copy, where the “type” corresponds to the names given in the clones array. We have defined the types to be “clone_for_SMI” or “clone_for_FTSE”, and each clone will ultimately only contain data for either the “SMI” or “FTSE” stock market.
  5. For each clone:
    1. Use the prune filter plugin to remove all fields except those fields which are whitelisted for the specific stock market.
    2. Add metadata to each document corresponding to the “type” that was added by the clone function. This is necessary because we are using the prune function which removes the “type” that was inserted by the clone function, and this information is required in the output stage for directing the document to the correct output.
  6. Use the Elasticsearch output plugin for Logstash to write the documents for each stock market to a different Elasticsearch output, with the output determined by the value defined in the metadata field that we added in Step 5. In order to simplify the code below, each Elasticsearch output writes to a unique index in a local Elasticsearch cluster. If multiple clusters should be used as outputs, then each Elasticsearch output declaration can be easily modified to specify unique Elasticsearch hosts.

Below is a Logstash pipeline that executes the above steps (with corresponding step numbers added as comments). Copy this pipeline into a file called "clones.conf" for execution:

## STEP 1
input {
  file {
    # make sure to edit the path to use your stocks.csv file
    path => "${HOME}/stocks.csv"
    # The following will ensure re-reading of full input 
    # each time Logstash executes (useful for debugging).
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
## STEP 2
filter {
   csv {
    columns => ["time","DAX","SMI","CAC","FTSE"]
    separator => ","
    convert => { 
      'DAX' => 'float'
      'SMI' => 'float'
      'CAC' => 'float'
      'FTSE' => 'float'
    }
  }
## STEP 3  
date {
    match => ['time', 'UNIX']
  }
## STEP 4
  # The following line will create 2 additional 
  # copies of each document (i.e. including the 
  # original, 3 in total). 
  # Each copy will automatically have a "type" field added 
  # corresponding to the name given in the array.
  clone {
    clones => ['clone_for_SMI', 'clone_for_FTSE']
  }
## STEP 5
  if [type] == 'clone_for_SMI' {
    # Remove everything except "SMI"
    prune {
       whitelist_names => [ "SMI"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_SMI" } 
    }
  } 
  else if [type] == 'clone_for_FTSE' {
    prune {
       whitelist_names => [ "FTSE"]
    }
    mutate {
      add_field => { "[@metadata][type]" => "only_FTSE" } 
    }
  } 
}
## STEP 6
output {
  # The following output to stdout is just for debugging 
  # and can be removed
  stdout { 
    codec =>  rubydebug {
      metadata => true
    }
  }
  if [@metadata][type] == 'only_SMI' {
    elasticsearch {
      index => "smi_data"
    }
  }
  else if [@metadata][type] == 'only_FTSE' {
    elasticsearch {
      index => "ftse_data"
    }
  }
  else {
    elasticsearch {
      index => "stocks_original"
    }
  }
}

Testing the Logstash pipeline

To test this pipeline with the example CSV data, you could execute the following command, modifying it to ensure that you use paths that are correct for your system. Note that specifying "config.reload.automatic" is optional, but allows us to automatically reload "clones.conf" without restarting Logstash:

./logstash -f ./clones.conf --config.reload.automatic

Once Logstash has read the "stocks.csv" file and completed processing, we can view the three resulting indexes called "smi_data", "ftse_data", and "stocks_original".

Check the SMI index

GET /smi_data/_search

This will display documents with the following structure. Notice that only “SMI” data appears in the "smi_data" index.

      {
        "_index": "smi_data",
        "_type": "doc",
        "_id": "_QRskWUBsYalOV9y9hGJ",
        "_score": 1,
        "_source": {
          "SMI": 1688.5    
        }
      }

Check the FTSE index

GET /ftse_data/_search

This will display documents with the following structure. Notice that only “FTSE” field appears in documents in the "ftse_data" index.

      {
        "_index": "ftse_data",
        "_type": "doc",
        "_id": "AgRskWUBsYalOV9y9hL0",
        "_score": 1,
        "_source": {
          "FTSE": 2448.2
        }
      }

Check the original documents index

GET /stocks_originals/_search

This will display documents with the following structure. Notice that the original unfiltered version of the documents appears in the "stocks_original" index.

      {
        "_index": "stocks_original",
        "_type": "doc",
        "_id": "-QRskWUBsYalOV9y9hFo",
        "_score": 1,
        "_source": {
          "host": "Alexanders-MBP",
          "@timestamp": "2017-01-01T00:30:00.000Z",
          "SMI": 1678.1,
          "@version": "1",
          "message": "1483230600,1628.75,1678.1,1772.8,2443.6",
          "CAC": 1772.8,
          "DAX": 1628.75,
          "time": "1483230600",
          "path": "/Users/arm/Documents/ES6.3/datasets/stocks_for_clones.csv",
          "FTSE": 2443.6
        }
      }

Conclusion

In this blog post I have demonstrated a small subset of the capabilities of Logstash. Specifically, I presented an example that showed how to use Logstash to ingest data from multiple stock markets, and to then process and drive that data into distinct outputs. If you are testing Logstash and the Elastic Stack and have questions, feel free to seek help on our public discussion forums.