15 November 2016 Engineering

A New Way To Ingest - Part 2

By Christoph Wurm

This is the second part of a two-part series about ingest nodes, a new feature in Elasticsearch 5.0.

In the first part we talked about what ingest nodes are, and how to configure and use them. In this second part we will focus on how to use ingest nodes as part of a deployment of the Elastic Stack.

A quick word about architecture

Many types of data have to be changed in some way (fields added or removed, text fields parsed, etc.) before it is indexed into Elasticsearch. The tool of choice for this task has so far been Logstash. With the ingest node in Elasticsearch, there is now an alternative for some cases: connecting one of the Beats directly to Elasticsearch and doing the transformations in the ingest node.


Screen Shot 2016-11-07 at 18.41.10.png

By default, the ingest functionality is enabled on any node. If you know you’ll have to process a lot of documents you might want to start a dedicated Elasticsearch node that will only be used for running ingest pipelines. To do that, switch off the master and data node types for that node in elasticsearch.yml (node.master: false and node.data: false) and switch off the ingest node type for all other nodes (node.ingest: false).

When consuming data from either Filebeat or Winlogbeat it might also no longer be necessary to use a message queue. Both of these Beats (but not others at this time) can handle backpressure: If Elasticsearch can’t keep up with indexing requests due to a large burst of events, these Beats will fall behind in sending the most recent data but will maintain a pointer to the last successfully indexed position in the files (Filebeat) or the event log (Winlogbeat). Once Elasticsearch has again caught up with the indexing load, the Beats will catch up to the most recent events.


It might still be a good idea to use a message queue such as Kafka or Redis, for example to decouple Beats and Elasticsearch architecturally or to be able to take Elasticsearch offline, e.g. for an upgrade. Keep in mind though that the ingest node is a push-based system, it will not read from a message queue. So when using a queue, it’s necessary to use Logstash to push the data from the queue into Elasticsearch and its ingest node.

Configure Filebeat and Elasticsearch

To deploy the new architecture described above we need to configure three components: Filebeat, Elasticsearch (incl. the ingest node) and Kibana. We’ll continue to use the example of ingesting web access logs from an Apache httpd web server.


We’ll take the configuration of each component in turn:

Elasticsearch

All the configuration was already described in Part 1 of this blog post series, but for reference we’ll include the relevant pieces here as well.


We’ll be using two ingest plugins that need to be installed first:


bin/elasticsearch-plugin install ingest-geoip

bin/elasticsearch-plugin install ingest-user-agent


Having done that successfully, we can create an ingest pipeline for web access logs (the format is that used by Kibana’s Console):


PUT _ingest/pipeline/access_log
{
  "description" : "Ingest pipeline for Combined Log Format",
  "processors" : [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \\[%{HTTPDATE:timestamp}\\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}"]
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats": [ "dd/MMM/YYYY:HH:mm:ss Z" ]
      }
    },
    {
      "geoip": {
        "field": "clientip"
      }
    },
    {
      "user_agent": {
        "field": "agent"
      }
    }
  ]
}

Note that the order of processors is important, as they are executed one after the other for each incoming document. First we use grok to extract the fields (clientip, timestamp, bytes, etc.), and then we use the appropriate processors (date, geoip, user_agent) on the extracted data.

Filebeat

Filebeat is used to collect the log messages line by line from the log file. In the filebeat.yml configuration file this will look something like this:


filebeat.prospectors:
- input_type: log
  paths:
    - /var/log/apache2/access_log

Further down, we can configure the Elasticsearch output:


output.elasticsearch:
  hosts: [<insert-your-es-host>:<es-port>]
  template.name: "access_log"
  template.path: <insert-path-to-template>
  pipeline: access_log

template.path has to point to a file containing an Elasticsearch template. Filebeat ships with a file called filebeat.template.json that you can modify. For the example in this blog post, we need to add one field to the properties section:


"geoip": {
  "properties": {
    "location": {
      "type": "geo_point"
    }
  }
}

This will allow us to visualise the locations of IP addresses in our logs on a map in Kibana later.

Ingest the data

We’re good to go! We can run Filebeat to start indexing logs into our Elasticsearch:


./filebeat -c filebeat.yml -e


Note: -e causes Filebeat to log to stderr instead of a log file and shouldn’t be used in production.


Using the _cat API we can check that the data has been successfully ingested:


GET _cat/indices/filebeat*?v

health status index               uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   filebeat-2016.10.28 W_p-RZwQRWWn_S49jwaTFw   5   1     300000            0    212.7mb        212.7mb

Use it in Kibana

To use the data in Kibana, we'll configure an index pattern for filebeat-*. These configurations are at Management -> Index Pattern.


Screen Shot 2016-10-28 at 17.05.29.png


To explore the data a bit, we can use Kibana’s Discover view. The search bar allows us to explore website visits by country, requested resource and other criteria.


For example, in this dataset we can search for visits to the blog section of the website from Amsterdam using the query request:blog AND geoip.city_name:"Amsterdam":


Screen Shot 2016-10-28 at 17.23.23.png


In the Visualize section, we can build a pie chart of Top Countries:


Screen Shot 2016-10-28 at 17.32.12.png


We can build complex visualisations in Timelion, e.g. with this query we see the fluctuations in the amount of requests from the North America and Europe overlaid with the number of all requests.


.es().bars(10).label(World) .es(q="geoip.continent_name:North America", metric=count).points(10, fill=10).label("North America") .es(q="geoip.continent_name:Europe", metric=count).points(10, fill=10).label(Europe)


Screen Shot 2016-10-28 at 17.40.52.png


A complete dashboard for web traffic might look something like this:


Screen Shot 2016-11-01 at 01.30.13.png


Conclusion


With the ingest node, there is now a way to transform data inside Elasticsearch before indexing it. This is especially useful if only simpler operations are required, while more complex ones can still be performed using Logstash. Written in Java, operations performed in the ingest node are very efficient.


When you don’t need the additional power and flexibility of Logstash filters, this allows you to simplify your architecture for simpler use cases. And with Kibana and Timelion as one of its new built-in features you have the perfect tool for visualising the data.