27 septembre 2016 Technique

A New Way To Ingest - Part 1

Par Christoph Wurm

With the upcoming release of version 5.0 of the Elastic Stack, it is time we took a closer look at how to use one of the new features, Ingest Nodes.

What are Ingest Nodes?

Ingest Nodes are a new type of Elasticsearch node you can use to perform common data transformation and enrichments.

Each task is represented by a processor. Processors are configured to form pipelines.

At the time of writing the Ingest Node had 20 built-in processors, for example grok, date, gsub, lowercase/uppercase, remove and rename. You can find a full list in the documentation.

Besides those, there are currently also three Ingest plugins:

  1. Ingest Attachment converts binary documents like Powerpoints, Excel Spreadsheets, and PDF documents to text and metadata
  2. Ingest Geoip looks up the geographic locations of IP addresses in an internal database
  3. Ingest user agent parses and extracts information from the user agent strings used by browsers and other applications when using HTTP

Create and Use an Ingest Pipeline

You configure a new ingest pipeline with the _ingest API endpoint.


Note: Throughout this blog post, when showing requests to Elasticsearch we are using the format of Console.


PUT _ingest/pipeline/rename_hostname
{
  "processors": [
    {
      "rename": {
        "field": "hostname",
        "target_field": "host",
        "ignore_missing": true
      }
    }
  ]
}

In this example, we configure a pipeline called rename_hostname that simply takes the field hostname and renames it to host.  If the hostname field does not exist, the processor continues without error.

To use this pipeline, there’s several ways.


When using plain Elasticsearch APIs, you specify the pipeline parameter in the query string, e.g.:


POST server/values/?pipeline=rename_hostname
{
  "hostname": "myserver"
}

In Logstash, you add the pipeline parameter to the elasticsearch output:

output {
  elasticsearch {
    hosts => "192.168.100.39"
    index => "server"
    pipeline => "rename_hostname"
  }
}

Similarly, you add a parameter to the elasticsearch output of any Beat:

output.elasticsearch:
  hosts: ["192.168.100.39:9200"]
  index: "server"
  pipeline: "convert_value"

Note: In alpha versions of 5.0, you had to use parameters.pipeline in the Beats configuration.

Simulate

When configuring a new pipeline, it is often very valuable to be able to test it before feeding it with real data - and only then discovering that it throws an error!


For that, there is the Simulate API:


POST _ingest/pipeline/rename_hostname/_simulate
{
  "docs": [
    {
      "_source": {
        "hostname": "myserver"
      }
    }
  ]
}

The result shows us that our field has been successfully renamed:


       [...]
        "_source": {
          "host": "myserver"
        },
        [...]

A real-world example: Web Logs

Let’s turn to something from the real world: Web logs.


This is an example of an access log in the Combined Log Format supported by both Apache httpd and nginx:


212.87.37.154 - - [12/Sep/2016:16:21:15 +0000] "GET /favicon.ico HTTP/1.1" 200 3638 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36"

As you can see, it contains several pieces of information: IP address, timestamp, a user agent string, and so on.

To allow fast search and visualisation we need to give every piece its own field in Elasticsearch. It would also be useful to know where this request is coming from. We can do all this with the following Ingest pipeline.


PUT _ingest/pipeline/access_log
{
  "description" : "Ingest pipeline for Combined Log Format",
  "processors" : [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \\[%{HTTPDATE:timestamp}\\] \"%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}"]
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats": [ "dd/MMM/YYYY:HH:mm:ss Z" ]
      }
    },
    {
      "geoip": {
        "field": "clientip"
      }
    },
    {
      "user_agent": {
        "field": "agent"
      }
    }
  ]
}

It contains a total of four processors:

  1. grok uses a regular expression to parse the whole log line into individual fields.
  2. date identifies the timestamp of the document.
  3. geoip takes the IP address of the requester and looks it up in an internal database to determine its geographical location.
  4. user_agent takes the user agent string and splits it up into individual components.

Since the last two processors are plugins that do not ship with Elasticsearch by default we will have to install them first:


bin/elasticsearch-plugin install ingest-geoip
bin/elasticsearch-plugin install ingest-user-agent

To test our pipeline, we can again use the Simulate API (the double quotes inside message have to be escaped):


POST _ingest/pipeline/access_log/_simulate
{
  "docs": [
    {
      "_source": {
        "message": "212.87.37.154 - - [12/Sep/2016:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\""
      }
    }
  ]
}

The result from Elasticsearch shows us that this worked:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_type": "_type",
        "_id": "_id",
        "_source": {
          "request": "/favicon.ico",
          "agent": "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"",
          "geoip": {
            "continent_name": "Europe",
            "city_name": null,
            "country_iso_code": "DE",
            "region_name": null,
            "location": {
              "lon": 9,
              "lat": 51
            }
          },
          "auth": "-",
          "ident": "-",
          "verb": "GET",
          "message": "212.87.37.154 - - [12/Sep/2016:16:21:15 +0000] \"GET /favicon.ico HTTP/1.1\" 200 3638 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/52.0.2743.116 Safari/537.36\"",
          "referrer": "\"-\"",
          "@timestamp": "2016-09-12T16:21:15.000Z",
          "response": 200,
          "bytes": 3638,
          "clientip": "212.87.37.154",
          "httpversion": "1.1",
          "user_agent": {
            "patch": "2743",
            "major": "52",
            "minor": "0",
            "os": "Mac OS X 10.11.6",
            "os_minor": "11",
            "os_major": "10",
            "name": "Chrome",
            "os_name": "Mac OS X",
            "device": "Other"
          },
          "timestamp": "12/Sep/2016:16:21:15 +0000"
        },
        "_ingest": {
          "timestamp": "2016-09-13T14:35:58.746+0000"
        }
      }
    }
  ]
}

Next

In the second part we will show how to set up an ingestion pipeline using Filebeat, Elasticsearch and Kibana to ingest and visualize web logs.