Ingest pipelinesedit

Ingest pipelines let you perform common transformations on your data before indexing. For example, you can use pipelines to remove fields, extract values from text, and enrich your data.

A pipeline consists of a series of configurable tasks called processors. Each processor runs sequentially, making specific changes to incoming documents. After the processors have run, Elasticsearch adds the transformed documents to your data stream or index.

Ingest pipeline diagram

You can create and manage ingest pipelines using Kibana’s Ingest Node Pipelines feature or the ingest APIs. Elasticsearch stores pipelines in the cluster state.

Prerequisitesedit

  • Nodes with the ingest node role handle pipeline processing. To use ingest pipelines, your cluster must have at least one node with the ingest role. For heavy ingest loads, we recommend creating dedicated ingest nodes.
  • If the Elasticsearch security features are enabled, you must have the manage_pipeline cluster privilege to manage ingest pipelines. To use Kibana’s Ingest Node Pipelines feature, you also need the cluster:monitor/nodes/info cluster privileges.
  • Pipelines including the enrich processor require additional setup. See Enrich your data.

Create and manage pipelinesedit

In Kibana, open the main menu and click Stack Management > Ingest Node Pipelines. From the list view, you can:

  • View a list of your pipelines and drill down into details
  • Edit or clone existing pipelines
  • Delete pipelines

To create a new pipeline, click Create a pipeline. For an example tutorial, see Example: Parse logs.

Kibana’s Ingest Node Pipelines list view

You can also use the ingest APIs to create and manage pipelines. The following create pipeline API request creates a pipeline containing two set processors followed by a lowercase processor. The processors run sequentially in the order specified.

PUT _ingest/pipeline/my-pipeline
{
  "description": "My pipeline description",
  "processors": [
    {
      "set": {
        "field": "my-long-field",
        "value": 10
      }
    },
    {
      "set": {
        "field": "my-boolean-field",
        "value": true
      }
    },
    {
      "lowercase": {
        "field": "my-keyword-field"
      }
    }
  ]
}

Manage pipeline versionsedit

When you create or update a pipeline, you can specify an optional version integer. Elasticsearch doesn’t use this version number internally, but you can use it to track changes to a pipeline.

PUT /_ingest/pipeline/my-pipeline-id
{
  "version" : 1,
  "processors": [ ... ]
}

To unset the version number using the API, replace or update the pipeline without specifying the version parameter.

Test a pipelineedit

Before using a pipeline in production, we recommend you test it using sample documents. When creating or editing a pipeline in Kibana, click Add documents. In the Documents tab, provide sample documents and click Run the pipeline.

Test a pipeline in Kibana

You can also test pipelines using the simulate pipeline API. You can specify a configured pipeline in the request path. For example, the following request tests my-pipeline.

POST _ingest/pipeline/my-pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ]
}

Alternatively, you can specify a pipeline and its processors in the request body.

POST _ingest/pipeline/_simulate
{
  "pipeline" : {
    "processors": [
      {
        "lowercase": {
          "field": "my-keyword-field"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "my-keyword-field": "FOO"
      }
    },
    {
      "_source": {
        "my-keyword-field": "BAR"
      }
    }
  ]
}

The API returns transformed documents:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_type": "_doc",
        "_id": "_id",
        "_source": {
          "my-keyword-field": "foo"
        },
        "_ingest": {
          "timestamp": "2099-02-30T22:30:03.187Z"
        }
      }
    },
    {
      "doc": {
        "_index": "_index",
        "_type": "_doc",
        "_id": "_id",
        "_source": {
          "my-keyword-field": "bar"
        },
        "_ingest": {
          "timestamp": "2099-02-30T22:30:03.188Z"
        }
      }
    }
  ]
}

Add a pipeline to an indexing requestedit

Use the pipeline query parameter to apply a pipeline to documents in individual or bulk indexing requests.

POST my-data-stream/_doc?pipeline=my-pipeline
{
  "@timestamp": "2099-03-07T11:04:05.000Z",
  "my-keyword-field": "foo"
}

PUT my-data-stream/_bulk?pipeline=my-pipeline
{ "create":{ } }
{ "@timestamp": "2099-03-08T11:04:05.000Z", "my-keyword-field" : "foo" }
{ "create":{ } }
{ "@timestamp": "2099-03-08T11:06:07.000Z", "my-keyword-field" : "bar" }

You can also use the pipeline parameter with the update by query or reindex APIs.

POST my-data-stream/_update_by_query?pipeline=my-pipeline

POST _reindex
{
  "source": {
    "index": "my-data-stream"
  },
  "dest": {
    "index": "my-new-data-stream",
    "op_type": "create",
    "pipeline": "my-pipeline"
  }
}

Set a default pipelineedit

Use the index.default_pipeline index setting to set a default pipeline. Elasticsearch applies this pipeline if no pipeline parameter is specified.

Set a final pipelineedit

Use the index.final_pipeline index setting to set a final pipeline. Elasticsearch applies this pipeline after the request or default pipeline, even if neither is specified.

Access source fields in a processoredit

Processors have read and write access to an incoming document’s source fields. To access a field key in a processor, use its field name. The following set processor accesses my-long-field.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "my-long-field",
        "value": 10
      }
    }
  ]
}

You can also prepend the _source prefix.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "_source.my-long-field",
        "value": 10
      }
    }
  ]
}

Use dot notation to access object fields.

If your document contains flattened objects, use the dot_expander processor to expand them first. Other ingest processors cannot access flattened objects.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "dot_expander": {
        "field": "my-object-field.my-property"
      }
    },
    {
      "set": {
        "field": "my-object-field.my-property",
        "value": 10
      }
    }
  ]
}

To access field values, enclose the field name in double curly brackets {{ }} to create a Mustache template snippet. You can use template snippets to dynamically set field names. The following processor sets a field name as the service field value.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "{{service}}",
        "value": "{{code}}"
      }
    }
  ]
}

Access metadata fields in a processoredit

Processors can access the following metadata fields by name:

  • _index
  • _id
  • _routing

For example, the following set processor sets the document’s routing value as the geoip.country_iso_code field value.

PUT _ingest/pipeline/my-pipeline
{
  "processors" : [
    {
      "set" : {
        "field": "_routing",
        "value": "{{geoip.country_iso_code}}"
      }
    }
  ]
}

Use a Mustache template snippet to access metadata field values. For example, {{_routing}} retrieves a document’s routing value.

If you automatically generate document IDs, you cannot use {{_id}} in a processor. Elasticsearch assigns auto-generated _id values after ingest.

Access ingest metadata in a processoredit

Ingest processors can add and access ingest metadata using the _ingest key.

Unlike source and metadata fields, Elasticsearch does not index ingest metadata fields by default. Elasticsearch also allows source fields that start with an _ingest key. If your data includes such source fields, use _source._ingest to access them.

Pipelines only create the _ingest.timestamp ingest metadata field by default. This field contains a timestamp of when Elasticsearch received the document’s indexing request. To index _ingest.timestamp or other ingest metadata fields, use the set processor.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "field": "received",
        "value": "{{_ingest.timestamp}}"
      }
    }
  ]
}

Handing pipeline failuresedit

A pipeline’s processors run sequentially. By default, pipeline processing stops when one of these processors fails or encounters an error.

To ignore a processor failure and run the pipeline’s remaining processors, set ignore_failure to true.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "field": "foo",
        "target_field": "bar",
        "ignore_failure": true
      }
    }
  ]
}

Use the on_failure parameter to specify a list of processors to run immediately after a processor failure. If on_failure is specified, Elasticsearch afterward runs the pipeline’s remaining processors, even if the on_failure configuration is empty.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "field": "foo",
        "target_field": "bar",
        "on_failure": [
          {
            "set": {
              "field": "error.message",
              "value": "field \"foo\" does not exist, cannot rename to \"bar\"",
              "override": false
            }
          }
        ]
      }
    }
  ]
}

Nest a list of on_failure processors for nested error handling.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "rename": {
        "field": "foo",
        "target_field": "bar",
        "on_failure": [
          {
            "set": {
              "field": "error.message",
              "value": "field \"foo\" does not exist, cannot rename to \"bar\"",
              "override": false,
              "on_failure": [
                {
                  "set": {
                    "field": "error.message.multi",
                    "value": "Document encountered multiple ingest errors",
                    "override": true
                  }
                }
              ]
            }
          }
        ]
      }
    }
  ]
}

You can also specify on_failure for a pipeline. If a processor without an on_failure value fails, Elasticsearch uses this pipeline-level parameter as a fallback. Elasticsearch will not attempt to run the pipeline’s remaining processors.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "field": "_index",
        "value": "failed-{{ _index }}"
      }
    }
  ]
}

Additional information about the pipeline failure may be available in the document metadata fields on_failure_message, on_failure_processor_type, on_failure_processor_tag, and on_failure_pipeline. These fields are accessible only from within an on_failure block.

The following example uses the error metadata fields to provide additional information on the document about the failure.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

Conditionally run a processoredit

Each processor supports an optional if condition, written as a Painless script. If provided, the processor only runs when the if condition is true.

if condition scripts run in Painless’s ingest processor context. In if conditions, ctx values are read-only.

The following drop processor uses an if condition to drop documents with a network_name of Guest.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "if": "ctx?.network_name == 'Guest'"
      }
    }
  ]
}

If the script.painless.regex.enabled cluster setting is enabled, you can use regular expressions in your if condition scripts. For supported syntax, see Painless regular expressions.

If possible, avoid using regular expressions. Expensive regular expressions can slow indexing speeds.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "set": {
        "if": "ctx.href?.url =~ /^http[^s]/",
        "field": "href.insecure",
        "value": true
      }
    }
  ]
}

You must specify if conditions as valid JSON on a single line. However, you can use the Kibana console's triple quote syntax to write and debug larger scripts.

If possible, avoid using complex or expensive if condition scripts. Expensive condition scripts can slow indexing speeds.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "if": """
            Collection tags = ctx.tags;
            if(tags != null){
              for (String tag : tags) {
                if (tag.toLowerCase().contains('prod')) {
                  return false;
                }
              }
            }
            return true;
        """
      }
    }
  ]
}

You can also specify a stored script as the if condition.

PUT _scripts/my-prod-tag-script
{
  "script": {
    "lang": "painless",
    "source": """
      Collection tags = ctx.tags;
      if(tags != null){
        for (String tag : tags) {
          if (tag.toLowerCase().contains('prod')) {
            return false;
          }
        }
      }
      return true;
    """
  }
}

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "if": { "id": "my-prod-tag-script" }
      }
    }
  ]
}

Incoming documents often contain object fields. If a processor script attempts to access a field whose parent object does not exist, Elasticsearch returns a NullPointerException. To avoid these exceptions, use null safe operators, such as ?., and write your scripts to be null safe.

For example, ctx.network?.name.equalsIgnoreCase('Guest') is not null safe. ctx.network?.name can return null. Rewrite the script as 'Guest'.equalsIgnoreCase(ctx.network?.name), which is null safe because Guest is always non-null.

If you can’t rewrite a script to be null safe, include an explicit null check.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [
    {
      "drop": {
        "if": "ctx.network?.name != null && ctx.network.name.contains('Guest')"
      }
    }
  ]
}

Conditionally apply pipelinesedit

Combine an if condition with the pipeline processor to apply other pipelines to documents based on your criteria. You can use this pipeline as the default pipeline in an index template used to configure multiple data streams or indices.

The following pipeline applies different pipelines to incoming documents based on the service.name field value.

PUT _ingest/pipeline/one-pipeline-to-rule-them-all
{
  "processors": [
    {
      "pipeline": {
        "if": "ctx.service?.name == 'apache_httpd'",
        "name": "httpd_pipeline"
      }
    },
    {
      "pipeline": {
        "if": "ctx.service?.name == 'syslog'",
        "name": "syslog_pipeline"
      }
    },
    {
      "fail": {
        "if": "ctx.service?.name != 'apache_httpd' && ctx.service?.name != 'syslog'",
        "message": "This pipeline requires service.name to be either `syslog` or `apache_httpd`"
      }
    }
  ]
}

Get pipeline usage statisticsedit

Use the node stats API to get global and per-pipeline ingest statistics. Use these stats to determine which pipelines run most frequently or spend the most time processing.

GET _nodes/stats/ingest?filter_path=nodes.*.ingest