How to

What's New with Elasticsearch Ingest Node in 6.5

Elasticsearch ingest node is a type of Elasticsearch node which can be used to process documents prior to indexing. Ingest nodes are part of the Elasticsearch cluster and have been available since the 5.0 release. Recently, the ingest node gained some new capabilities and in this blog we are going to highlight those changes.

Let's start with a brief Ingest Node refresher.

The ingest node works by defining a pipeline, and then processors within that pipeline. There are about 30 supported processors and they are available via plugins or directly from Elasticsearch's distribution.

For example, a simple pipeline that renames a field:

PUT _ingest/pipeline/renamer
{
  "description": "A pipeline to rename fields",
  "processors": [
    {
      "rename": {
        "field": "provider",
        "target_field": "cloud.provider"
      }
    }
  ]
}

A pipeline can have one or many processors which will be executed in the order in which they are defined. To use a pipeline simply add ?pipeline=<pipeline_name> to an index request to run the document through that pipeline.

PUT test/_doc/1?pipeline=renamer
{
  "provider": "aws",
  "source": "billing"
}

In this case the provider field has been renamed to cloud.provider

{
...
  "_source" : {
    "cloud" : {
      "provider" : "aws"
    },
    "source" : "billing"
  }
}

Perhaps the most exciting new capability for Ingest node is the introduction of conditional execution of a processor.

"if" - Conditional execution of a processor

Prior to 6.5, there was no way to skip a processor from running. Using ignore_failure, ignore_missing, or on_failure could be used in some cases as a workaround, but there was never an easy way to conditionally run or not run a processor in a pipeline.

In 6.5, every processor now has an optional if field that can be set to configure when a processor should be executed or not. The value of the if key is a Painless script running in the Ingest processor context that needs to evaluate to true or false.

For example, using the example above we can now conditionally rename provider to cloud.provider if the document has a field named source and it’s value is billing.

PUT _ingest/pipeline/renamer
{
  "description": "A pipeline to conditionally rename fields",
  "processors": [
    {
      "rename": {
        "if": "ctx.source == 'billing'",
        "field": "provider",
        "target_field": "cloud.provider"
      }
    }
  ]
}

Since the contents of the conditional is a painless script, you have the full power of Painless to help evaluate the condition. The following is a slightly better version of the same condition that uses Painless's null safe operator and toLowerCase () to ensure that the condition is not case sensitive.

"if": "ctx.source?.toLowerCase() == 'billing'",

More information can be found in the ingest node conditionals documentation.

Drop Processor

The drop processor will prevent a document from being indexed. A drop processor didn't make much sense without the ability to conditionally execute it (since otherwise, it would drop all documents). Now that the Ingest node can conditionally execute a processor, it can also conditionally drop documents to prevent them from getting indexed.

For example, maybe we don't care about documents from marketing. We can drop them.

PUT _ingest/pipeline/dropper
{
  "description": "A pipeline to drop documents",
  "processors": [
    {
      "drop": {
        "if": "ctx.source?.toLowerCase() == 'marketing'"
      }
    }
  ]
}

Since the ingest node runs prior to indexing, these documents from marketing never make it to the index. See the drop processor documentation for more information.

Pipeline processor

The pipeline processor is a special processor that only calls other processors. Given our two examples above where we have a renamer pipeline and dropper pipeline, it would be nice if we can re-use those pipelines from other pipelines. This can help to prevent a lot of copy/paste of processors and allows for better organization of pipelines.

PUT _ingest/pipeline/master
{
  "description": "A master pipeline that calls other pipelines",
  "processors": [
    {
      "pipeline": {
        "name": "dropper"
      }
    },
    {
      "pipeline": {
        "name": "renamer"
      }
    }
  ]
}

Indexing to the pipeline (?pipeline=master) will drop the document if the source is marketing, and will rename the field if the source is billing. See the pipeline processor reference documentation and Conditionals with the Pipeline Processor .

Dissect Processor

The dissect processor is an alternative to the Grok processor to dissect a log line into its parts. It isn't quite as flexible as Grok, but it can solve many parsing use cases with a simpler syntax and may perform better than Grok since it doesn't use regular expressions. The dissect processor may sound familiar since Beats and Logstash also have dissect processors (a filter in Logstash terms). If you need to parse log lines into their parts, and the log lines don't deviate much from each other, dissect may be the best option.

To parse the following log line:

1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] "GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0" 200 3171

A pipeline with a Dissect processor can be configured as follows

PUT _ingest/pipeline/dissecter
{
  "processors": [
    {
      "dissect": {
        "field": "message",
        "pattern": "%{clientip} %{ident} %{auth} [%{@timestamp}] \"%{verb} %{request} HTTP/%{httpversion}\" %{status} %{size}"
      }
    }
  ]
}

Resulting in the following document that will be indexed:

{
...
  "_source" : {
    "request" : "/english/venues/cities/images/montpellier/18.gif",
    "auth" : "-",
    "ident" : "-",
    "verb" : "GET",
    "message" : "1.2.3.4 - - [30/Apr/1998:22:00:52 +0000] \"GET /english/venues/cities/images/montpellier/18.gif HTTP/1.0\" 200 3171",
    "@timestamp" : "30/Apr/1998:22:00:52 +0000",
    "size" : "3171",
    "clientip" : "1.2.3.4",
    "httpversion" : "1.0",
    "status" : "200"
  }
}

See the dissect processor documentation to learn more.

Other Changes

In addition to the above changes the following improvements came along with 6.5:

  • The ability to set a pipeline as a default pipeline for a given index.
  • Better documentation that now includes all options for each processor.
  • Improved node level stats which include per processor metrics.
  • Updates to Kibana Console's auto-complete for the new features.
  • A Rally challenge has been added to measure grok performance over time. See Elasticsearch Benchmarks.