Enrich your Elasticsearch documents with Logstash

139799_-_Batch_of_7_images_for_Financial_Sector-02_(1).jpg

We saw in the previous post that we can do data enrichment within Elasticsearch® with the Elasticsearch Enrich Processor within an ingest pipeline. But sometimes, you need to perform more complex tasks or your source of data is not Elasticsearch but another source. Or maybe you want to store in Elasticsearch and also in a third-party system, in which case, moving the execution of your pipeline to Logstash® makes a lot of sense.

Enriching Elasticsearch data with Elasticsearch

With Logstash, that is super easy with a pipeline similar to this:

input {
  # Read all documents from Elasticsearch
  elasticsearch {
    hosts => ["${ELASTICSEARCH_URL}"]
    user => "elastic"
    password => "${ELASTIC_PASSWORD}"
    index => "kibana_sample_data_logs"
    docinfo => true
    ecs_compatibility => "disabled"
  }
}
filter {
  # Enrich every document with Elasticsearch
  elasticsearch {
    hosts => ["${ELASTICSEARCH_URL}"]
    user => "elastic"
    password => "${ELASTIC_PASSWORD}"
    index => "vip"
    query => "ip:%{[clientip]}"
    sort => "ip:desc"
    fields => {
      "[name]" => "[name]"
      "[vip]" => "[vip]"
    }
  }
  mutate { 
    remove_field => ["@version", "@timestamp"] 
  }
}
output {
  if [name] {
    # Write all modified documents to Elasticsearch
    elasticsearch {
      manage_template => false
      hosts => ["${ELASTICSEARCH_URL}"]
      user => "elastic"
      password => "${ELASTIC_PASSWORD}"
      index => "%{[@metadata][_index]}"
      document_id => "%{[@metadata][_id]}"
    }
  }
}

In total, we have 14074 events to parse. Not a lot, but enough for this demo. Here's a sample event:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  }
}

As we saw in the previous post, the vip index contains information about our customers:

{ 
  "ip" : "30.156.16.164", 
  "vip": true, 
  "name": "David P" 
}

We can run the pipeline with:

docker run \
  --name=logstash \
  --rm -it \
  -v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \
  -e XPACK_MONITORING_ENABLED=false \
  -e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \
  -e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \
  docker.elastic.co/logstash/logstash:8.12.0

The enriched document is now looking like this:

{
  "agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24",
  "bytes": 1831,
  "clientip": "30.156.16.164",
  "extension": "",
  "geo": {
    "srcdest": "US:IN",
    "src": "US",
    "dest": "IN",
    "coordinates": {
      "lat": 55.53741389,
      "lon": -132.3975144
    }
  },
  "host": "elastic-elastic-elastic.org",
  "index": "kibana_sample_data_logs",
  "ip": "30.156.16.163",
  "machine": {
    "ram": 9663676416,
    "os": "win xp"
  },
  "memory": 73240,
  "message": "30.156.16.163 - - [2018-09-01T12:43:49.756Z] \"GET /wp-login.php HTTP/1.1\" 404 1831 \"-\" \"Mozilla/5.0 (X11; Linux i686) AppleWebKit/534.24 (KHTML, like Gecko) Chrome/11.0.696.50 Safari/534.24\"",
  "phpmemory": 73240,
  "referer": "http://www.elastic-elastic-elastic.com/success/timothy-l-kopra",
  "request": "/wp-login.php",
  "response": 404,
  "tags": [
    "success",
    "info"
  ],
  "timestamp": "2023-03-18T12:43:49.756Z",
  "url": "https://elastic-elastic-elastic.org/wp-login.php",
  "utc_time": "2023-03-18T12:43:49.756Z",
  "event": {
    "dataset": "sample_web_logs"
  },
  "vip": true,
  "name": "David P"
}

It's easy actually but there's a problem: it's slow. Doing a lookup over the network, even though Elasticsearch is blazing fast, still slows down the whole pipeline.

Using a static JDBC filter

I met recently Laurent, from the amazing Elastic Consulting team, at the ParisJUG and we spoke about that problem. He told me that one of his customers had to face that issue. He suggested to use instead a cache of Elasticsearch in Logstash.

The problem is: there's no such a filter cache plugin in Logstash. He found a very smart way to address that problem by leveraging the use of the Static JDBC filter plugin and the Elasticsearch JDBC Driver.

Note that this requires to have a platinum license (or a trial).

Adding the Elasticsearch JDBC driver

We need first to add the JDBC driver to our Logstash instance.

mdir -p logstash-config/lib
wget https://artifacts.elastic.co/maven/org/elasticsearch/plugin/x-pack-sql-jdbc/8.12.0/x-pack-sql-jdbc-8.12.0.jar
mv x-pack-sql-jdbc-8.12.0.jar logstash-config/lib

We will just need to share this directory with our Logstash docker instance.

time docker run \
  --name=logstash \
  --rm -it \
  -v $(pwd)/logstash-config/pipeline/:/usr/share/logstash/pipeline/ \
  -v $(pwd)/logstash-config/lib/:/tmp/lib/ \
  -e XPACK_MONITORING_ENABLED=false \
  -e ELASTICSEARCH_URL="$ELASTICSEARCH_URL" \
  -e ELASTIC_PASSWORD="$ELASTIC_PASSWORD" \
  docker.elastic.co/logstash/logstash:8.12.0

Updating the pipeline

The input part does not change. But now, we want to create a temporary in memory table named vip (for consistency). This table structure is defined with the local_db_objects parameter:

jdbc_static {
  local_db_objects => [ {
    name => "vip"
    index_columns => ["ip"]
    columns => [
      ["name", "VARCHAR(255)"],
      ["vip", "BOOLEAN"],
      ["ip", "VARCHAR(64)"]
    ]
  } ]
}

When the jdbc_static starts, we want to read all the data set first from Elasticsearch vip index. This is done within the loaders option:

jdbc_static {
  loaders => [ {
    query => "select name, vip, ip from vip"
    local_table => "vip"
  } ]
  jdbc_user => "elastic"
  jdbc_password => "${ELASTIC_PASSWORD}"
  jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"
  jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"
  jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
}

Every time we need to do a lookup, we want to perform it using the following statement:

SELECT name, vip FROM vip WHERE ip = "THE_IP"

This can be defined using the local_lookups parameter:

jdbc_static {
  local_lookups => [ {
    query => "SELECT name, vip FROM vip WHERE ip = :ip"
    parameters => { "ip" => "clientip" }
    target => "vip"
  } ]
}

If no data is found, we can provide a default value using the default_hash option:

jdbc_static {
  local_lookups => [ {
    query => "SELECT name, vip FROM vip WHERE ip = :ip"
    parameters => { "ip" => "clientip" }
    target => "vip" 
    default_hash => {
      name => nil
      vip => false
    }
  } ]
}

At the end, this will generate a vip.name and vip.vip fields in the event.

We can now define what we want to do with those temporary fields:

jdbc_static {
  add_field => { name => "%{[vip][0][name]}" }
  add_field => { vip => "%{[vip][0][vip]}" }
  remove_field => ["vip"]
}

This gives the following filter:

filter {
  # Enrich every document with Elasticsearch via static JDBC
  jdbc_static {
    loaders => [ {
      query => "select name, vip, ip from vip"
      local_table => "vip"
    } ]
    local_db_objects => [ {
      name => "vip"
      index_columns => ["ip"]
      columns => [
        ["name", "VARCHAR(255)"],
        ["vip", "BOOLEAN"],
        ["ip", "VARCHAR(64)"]
      ]
    } ]
    local_lookups => [ {
      query => "SELECT name, vip FROM vip WHERE ip = :ip"
      parameters => { "ip" => "clientip" }
      target => "vip" 
      default_hash => {
        name => nil
        vip => false
      }
    } ]
    add_field => { name => "%{[vip][0][name]}" }
    add_field => { vip => "%{[vip][0][vip]}" }
    remove_field => ["vip"]
    jdbc_user => "elastic"
    jdbc_password => "${ELASTIC_PASSWORD}"
    jdbc_driver_class => "org.elasticsearch.xpack.sql.jdbc.EsDriver"
    jdbc_driver_library => "/tmp/lib/x-pack-sql-jdbc-8.12.0.jar"
    jdbc_connection_string => "jdbc:es://${ELASTICSEARCH_URL}"
  }
  mutate { 
    remove_field => ["@version", "@timestamp"] 
  }
}

Writing modified documents to Elasticsearch

In the first pipeline, we were testing if the name field actually exists in the event:

if [name] {
  # Index to Elasticsearch
}

We can still use something similar but because we provided default values in case the ip cannot be found in the Elasticsearch vip index, it now generates a new _jdbcstaticdefaultsused tag in the tags table.

We can use it to know if we found something or not and if the former, send our data to Elasticsearch:

output {
  if "_jdbcstaticdefaultsused" not in [tags] {
    # Write all the modified documents to Elasticsearch
    elasticsearch {
      manage_template => false
      hosts => ["${ELASTICSEARCH_URL}"]
      user => "elastic"
      password => "${ELASTIC_PASSWORD}"
      index => "%{[@metadata][_index]}"
      document_id => "%{[@metadata][_id]}"
    }
  }
}

Is it faster?

So when we run the test on this small data set, we can see that with the Elasticsearch filter approach, it takes a bit more than two minutes to enrich our data set:

real    2m3.146s
user    0m0.077s
sys     0m0.042s

When running the pipeline with the JDBC static filter approach, it now takes less than a minute:

real    0m48.575s
user    0m0.064s
sys     0m0.039s

As we can see, we have significantly reduced the execution time of this enrichment pipeline (a gain of around 60%).

You could try this strategy (or a similar one) if you have a tiny Elasticsearch index that can easily fit in Logstash JVM memory. If you have hundred of million of documents, you should still use the Elasticsearch Filter Plugin.

Conclusion

In this post, we saw how we can use the JDBC static filter plugin to speed up our data enrichment pipeline when we need to perform some lookups in Elasticsearch. In the next post, we will see how we can do similar enrichment on the edge with the Elastic Agent.

The release and timing of any features or functionality described in this post remain at Elastic's sole discretion. Any features or functionality not currently available may not be delivered on time or at all.