Tech Topics

Ingest Node: A Client's Perspective

With the first alpha release of Elasticsearch 5.0 comes a ton of new and awesome features, and if you've been paying attention then you know that one of the more prominent of these features is the new shiny ingest node. Simply put, ingest aims to provide a lightweight solution for pre-processing and enriching documents within Elasticsearch itself before they are indexed.

We aren't going to dive into the details of how ingest node works in this blog post (it is recommended that you read the docs as a prerequisite), but instead we're going to showcase how to consume the ingest APIs from an Elasticsearch client. In these examples we're going to use NEST, the official .NET client, but keep in mind that these concepts apply to any of the official Elasticsearch language clients. So whether you're C# inclined or not, transferring this knowledge to the language of your choice should be trivial.

Creating an ingestion pipeline

In Elasticsearch 5.0, all nodes are ingest nodes by default, so there's nothing to do in terms of setting up Elasticsearch. Thus, the first step to enriching documents via an ingest node is to create an ingestion pipeline.

Let's use the classic Tweet example (my apologies) and create a tweets index in our Elasticsearch cluster with the following mapping:

{
  "tweets": {
    "mappings": {
      "tweet": {
        "properties": {
          "lang": {
            "type": "keyword"
          },
          "message": {
            "type": "text"
          },
          "retweets": {
            "type": "integer",
            "coerce": false
          }
        }
      }
    }
  }
}

Here, message contains the actual content of the tweet, lang represents the language code (en, fr, etc...), and retweets is the number of times the tweet has been re-tweeted.

Now imagine we have the following C# type to model our tweet documents:

public class Tweet
{
    public string Message { get; set; }
    public string Lang { get; set; }
    public string Retweets { get; set; }
}

Notice that Retweets is of type string, and perhaps for some reason we cannot to change the type. Well, since we set "coerce": false in our mapping of the retweets field, if we try to index one of these documents, Elasticsearch is going to throw a parse exception since the incoming value will be a string.

So, what can we do? Let's create a pipeline that converts our retweets field to an integer before indexing it. While we're at it, let's also uppercase our language codes since they are of type keyword in our mapping and so case-sensitivity matters.

Here's what this looks like using NEST:

var client = new ElasticClient();
client.PutPipeline("tweet-pipeline", p => p
    .Processors(ps => ps
        .Convert<Tweet>(c => c
            .Field(t => t.Retweets)
            .Type(ConvertProcessorType.Integer)
        )
        .Uppercase<Tweet>(u => u
            .Field(t => t.Lang)
        )
    )
);

So what we've done here is used the put pipeline API to create a pipeline with the id "tweet-pipeline" that has two processors. A convert processor for converting retweets from string to integer, and an uppercase processor for, you guessed it, upper-casing the value of our lang field.

Indexing documents

Now that we've created our pipeline, let's index some documents using the bulk API and enrich them through the pipeline.

client.Bulk(b => b
    .Index("tweets")
    .Pipeline("tweet-pipeline")
    .Index<Tweet>(i => i
        .Document(new Tweet { Retweets = "4", Message = "Hello, Twitter!", Lang = "en" })
    )
    .Index<Tweet>(i => i
        .Document(new Tweet { Retweets = "32", Message = "Bonjour, Twitter!", Lang = "fr" })
    )
    .Index<Tweet>(i => i
        .Document(new Tweet { Retweets = "", Message = "Hallo, Twitter !", Lang = "nl" })
    )
);

Business as usual, except notice we specified a pipeline. This tells Elasticsearch we want to pre-process each document using the "tweets-pipeline" we created earlier before indexing. Here we specified the pipeline for all index commands, but we could specify a different pipeline for each individual index command if we had multiple pipelines.

Handling errors

If we inspect the response from our bulk request:

{
  "took" : 33,
  "ingest_took" : 4,
  "errors" : true,
  "items" : [ {
    "index" : {
      "_index" : "tweets",
      "_type" : "tweet",
      "_id" : "AVSl-cCKVD5bKRQTTXNo",
      "_version" : 1,
      "_shards" : {
        "total" : 2,
        "successful" : 1,
        "failed" : 0
      },
      "created" : true,
      "status" : 201
    }
  }, {
    "index" : {
      "_index" : "tweets",
      "_type" : "tweet",
      "_id" : "AVSl-cCKVD5bKRQTTXNp",
      "_version" : 1,
      "_shards" : {
        "total" : 2,
        "successful" : 1,
        "failed" : 0
      },
      "created" : true,
      "status" : 201
    }
  }, {
    "index" : {
      "_index" : "tweets",
      "_type" : "tweet",
      "_id" : null,
      "status" : 400,
      "error" : {
        "type" : "illegal_argument_exception",
        "reason" : "unable to convert [] to integer",
        "caused_by" : {
          "type" : "number_format_exception",
          "reason" : "For input string: \"\""
        }
      }
    }
  } ]
}

We'll notice that our last document failed. Why? Well if we take a look at the document we tried to index we'll notice that the value Retweets was an empty string and our convert processor didn't know what to do with it.

We can address this by telling our processor what to do when it encounters a failure. Each processor has an on_failure property, which accepts more processors that it will execute when an error occurs. These nested processors themselves also have an on_failure property which you can further nest error handling, and so on and so forth. Also, the pipeline itself has its' own on_failure which you can set as a "catch all" error handler for the entire pipeline.

For our tweet-pipeline example, let's just update our convert processor and add a set processor to on_failure that will set retweets to 0 if it fails to convert the original value.

 client.PutPipeline("tweet-pipeline", p => p
    .Processors(ps => ps
        .Uppercase<Tweet>(u => u
            .Field(twt => t.Lang)
        )
        .Convert<Tweet>(c => c
            .Field(twt => twt.Retweets)
            .Type(ConvertProcessorType.Integer)
            .OnFailure(f => f
                .Set<Tweet>(s => s
                    .Field(t => t.Retweets)
                    .Value(0)
                )
            )
        )
    )
);

If we index the document again (this time just using the index API), we'll get a 201 back from Elasticsearch:

client.Index(new Tweet { Retweets = "", Message = "Hallo, Twitter !", Lang = "nl" }, i => i
    .Index("tweets")
    .Pipeline("tweet-pipeline")
);

Now let's take a look at the documents in our index:

"hits": [
  {
    "_index": "tweets",
    "_type": "tweet",
    "_id": "AVSmDI7jVD5bKRQTTXin",
    "_score": 1,
    "_source": {
      "retweets": 32,
      "message": "Bonjour, Twitter!",
      "lang": "FR"
    }
  },
  {
    "_index": "tweets",
    "_type": "tweet",
    "_id": "AVSmDJB6VD5bKRQTTXio",
    "_score": 1,
    "_source": {
      "retweets": 0,
      "message": "Hallo, Twitter !",
      "lang": "NL"
    }
  },
  {
    "_index": "tweets",
    "_type": "tweet",
    "_id": "AVSmDI7jVD5bKRQTTXim",
    "_score": 1,
    "_source": {
      "retweets": 4,
      "message": "Hello, Twitter!",
      "lang": "EN"
    }
  }
]

All of our language codes are uppercase and retweets for our NL has been set to 0!

That is — in a nutshell — ingest node.

Client considerations

Dedicated ingest nodes

Since Elasticsearch will automatically reroute ingest requests to ingest nodes, you don't have to specify or configure any routing information. However, if you're doing heavy ingestion and have dedicated ingest nodes, it makes sense to send index requests to these nodes directly, to avoid any extra hops in the cluster.

The simplest way to achieve this with any Elasticsearch client is to create a dedicated "indexing" client instance, and use it for indexing requests:

var pool = new StaticConnectionPool(new []
{
    new Uri("http://ingestnode1:9200"),
    new Uri("http://ingestnode2:9200"),
    new Uri("http://ingestnode3:9200")
});
var settings = new ConnectionSettings(pool);
var indexingClient = new ElasticClient(settings);

Increasing timeouts

When a pipeline is specified, there is an added overhead of document enrichment when indexing a document. For large bulk requests, you might need to increase the default indexing timeout (1m) to avoid exceptions.

Keep in mind, that the client may have its own request timeout — this should be increased as well, at least to the same value as the Elasticsearch timeout.

client.Bulk(b => b
    .Index("tweets")
    .Pipeline("tweet-pipeline")
    .Timeout("5m") // Increases the bulk timeout to 5 minutes
    .Index<Tweet>(/*snip*/)
    .Index<Tweet>(/*snip*/)
    .Index<Tweet>(/*snip*/)
    .RequestConfiguration(rc => rc
        .RequestTimeout(TimeSpan.FromMinutes(5)) // Increases the request timeout to 5 minutes
    )
);

Conclusion

In this post I've covered the basics of the new ingest node feature coming in Elasticsearch 5.0 and how to consume the ingest APIs from an Elasticsearch language client. I've only scratched the surface though, so I highly recommend reading the docs and watching the Ingest Node: Enriching Documents within Elasticsearch talk from Elastic{ON}16. NEST supports the ingest node APIs since the 5.0.0-alpha1 release and will work against any Elasticsearch 5.0.0 alpha release. Try it out and let us know what you think!