Indexing documents with the NEST Elasticsearch .NET client | Elastic Blog
Engineering

Indexing documents with the NEST Elasticsearch .NET client

Introduction

There are a number of ways you can index documents into Elasticsearch using the NEST Elasticsearch .NET client.

This blog post will demonstrate some of the simple methods, from indexing a single document at a time, to more advanced methods using the BulkObservable helper.

Single documents

Within NEST, a document is modelled as POCO (plain old CLR object), an example is given below:

public class Person
{
    public int Id { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
}

An instance of this object, which represents a single document in Elasticsearch, can then be indexed using a few different methods. Let's use the following instance as an example:

var person = new Person
{
    Id = 1,
    FirstName = "Martijn",
    LastName = "Laarman"
};

The IndexDocument<T> and IndexDocumentAsync<T> methods provide a simple way to index a single document of type T, using default parameters. The result of this method call can be inspected to determine if the indexing operation was successful.

// synchronous method that returns an IIndexResponse object
var indexResponse = client.IndexDocument(person);
//asynchronous method that returns a Task<IIndexResponse> that can be awaited
var indexResponseAsync = await client.IndexDocumentAsync(person);
// Inspect the result of the synchronous operation
if (!indexResponse.IsValid)
{
    // If the request isn't valid, we can take action here
}

The IsValid property can be used to check if a response is functionally valid or not. This is a NEST abstraction to have a single point to check whether something wrong happened with the request.

If you need to set additional parameters when indexing a document, you can use the fluent or object initializer syntax. This will give you finer control over the indexing process. In the example below we will index the document into an index named “people”.

// fluent syntax
var fluentIndexResponse = client.Index(person, i => i.Index("people"));
// object initializer syntax
var initializerIndexResponse = client.Index(new IndexRequest<Person>(person, "people"));

A naïve approach to indexing multiple documents would be to simply create a loop to index a single document on each iteration; however, this is a very inefficient approach that will not scale well for large document collections.

Multiple documents

The bulk API can be used for indexing multiple documents. First, let’s create a collection of documents to index:

var people = new []
{
    new Person
    {
        Id = 1,
        FirstName = "Martijn",
        LastName = "Laarman"
    },
    new Person
    {
        Id = 2,
        FirstName = "Stuart",
        LastName = "Cam"
    },
    new Person
    {
        Id = 3,
        FirstName = "Russ",
        LastName = "Cam"
    }
    // snip
};

Multiple documents can be indexed using the IndexMany and IndexManyAsync methods, either synchronously or asynchronously, respectively. These methods are specific to the NEST client and wrap calls to the client's Bulk method and bulk API, providing a convenient shortcut for indexing many documents.

Note that these methods index all documents in a single HTTP request, so for very large document collections, you need to partition the collection into many smaller batches and issue multiple Bulk calls. When you find yourself needing to do this, consider using the BulkAllObservable<T> helper instead, described later in the post.

// synchronous method that returns an IBulkResponse
var indexManyResponse = client.IndexMany(people);
if (indexManyResponse.Errors)
{
    // the response can be inspected for errors
    foreach (var itemWithError in indexManyResponse.ItemsWithErrors)
    {
        // if there are errors, they can be enumerated and inspected
        Console.WriteLine("Failed to index document {0}: {1}",
            itemWithError.Id, itemWithError.Error);
    }
}
// alternatively, documents can be indexed asynchronously
var indexManyAsyncResponse = await client.IndexManyAsync(people);

If you require finer-grained control over indexing many documents you can use the Bulk and BulkAsync methods and use the descriptors to customise the bulk calls.

As with the IndexMany methods above, documents are sent to the _bulk endpoint in a single HTTP request. This does mean that consideration will need to be given to the overall size of the HTTP request. For indexing large numbers of documents you'll likely want to use the BulkAllObservable<T> helper.

// returns an IBulkResponse that can be inspected for errors
var bulkIndexResponse = client.Bulk(b => b
    .Index("people")
    .IndexMany(people)
);
// asynchronous version
var asyncBulkIndexResponse = await client.BulkAsync(b => b
    .Index("people")
    .IndexMany(people)
);

BulkAllObservable<T> helper

Using the BulkAllObservable<T> helper allows you to focus on the overall objective of indexing a collection of documents, without having to concern yourself with retry, backoff or batching mechanics.

Multiple documents can be indexed using the BulkAll method and BlockingSubscribeExtensions Wait() extension method. This helper exposes functionality to automatically retry / backoff in the event of an indexing failure, and to control the number of documents indexed in a single HTTP request.

In the following example, each request indexes 1000 documents, batched from the original input. In the event of a large number of documents this could result in many HTTP requests, each containing 1000 documents (the last request may contain less, depending on the total number).

The helper lazily enumerates an IEnumerable<T> collection, allowing you to index a large number of documents easily, such as documents materialized from paginated database records.

var bulkAllObservable = client.BulkAll(people, b => b
    .Index("people")
    // how long to wait between retries
    .BackOffTime("30s") 
    // how many retries are attempted if a failure occurs
    .BackOffRetries(2) 
    // refresh the index once the bulk operation completes
    .RefreshOnCompleted()
    // how many concurrent bulk requests to make
    .MaxDegreeOfParallelism(Environment.ProcessorCount)
    // number of items per bulk request
    .Size(1000)
)
// Perform the indexing, waiting up to 15 minutes. 
// Whilst the BulkAll calls are asynchronous this is a blocking operation
.Wait(TimeSpan.FromMinutes(15), next =>
{
    // do something on each response e.g. write number of batches indexed to console
});

The BulkAllObservable<T> helper exposes a number of advanced features.

  1. BufferToBulk allows for the customisation of individual operations within the bulk request before it is dispatched to the server.
  2. RetryDocumentPredicate enables fine control on deciding if a document that failed to be indexed should be retried.
  3. DroppedDocumentCallback in the event a document is not indexed, even after retrying, this delegate is called.
client.BulkAll(people, b => b
    .BufferToBulk((descriptor, list) =>
    {
        // customise the individual operations in the bulk
        // request before it is dispatched
        foreach (var item in list)
        {
           // index each document into either even-index or odd-index
           descriptor.Index<Person>(bi => bi
              .Index(item.Id % 2 == 0 ? "even-index" : "odd-index")
              .Document(item)
           );
        }
    })
    .RetryDocumentPredicate((item, person) =>
    {
        // decide if a document should be retried in the event of a failure
        return item.Error.Index == "even-index" && person.FirstName == "Martijn";
    })
    .DroppedDocumentCallback((item, person) =>
    {
        // if a document cannot be indexed this delegate is called
        Console.WriteLine($"Unable to index: {item} {person}");
    })
);

Ingest nodes

Since Elasticsearch will automatically reroute ingest requests to ingest nodes, you don't have to specify or configure any routing information. However, if you're doing heavy ingestion and have dedicated ingest nodes, it makes sense to send index requests to these nodes directly, to avoid any extra hops in the cluster.

The simplest way to achieve this is to create a dedicated "indexing" client instance, and use it for indexing requests.

// list of ingest nodes
var pool = new StaticConnectionPool(new []
{
    new Uri("http://ingestnode1:9200"),
    new Uri("http://ingestnode2:9200"),
    new Uri("http://ingestnode3:9200")
});
var settings = new ConnectionSettings(pool);
var indexingClient = new ElasticClient(settings);

In complex cluster configurations it can be easier to use a sniffing connection pool along with a node predicate to filter out the nodes that have ingest capabilities. This allows you to customise the cluster and not have to reconfigure the client.

// list of cluster nodes
var pool = new SniffingConnectionPool(new []
{
    new Uri("http://node1:9200"),
    new Uri("http://node2:9200"),
    new Uri("http://node3:9200")
});
// predicate to select only nodes with ingest capabilities
var settings = new ConnectionSettings(pool).NodePredicate(n => n.IngestEnabled);
var indexingClient = new ElasticClient(settings);

Ingest pipelines

Let’s modify our Person type to include some additional information:

public class Person
{
    public int Id { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public string IpAddress { get; set; }
    public GeoIp GeoIp { get; set; }
}
public class GeoIp
{
    public string CityName { get; set; }
    public string ContinentName { get; set; }
    public string CountryIsoCode { get; set; }
    public GeoLocation Location { get; set; }
    public string RegionName { get; set; }
}

We can create an ingestion pipeline that manipulates the incoming values before they are indexed. Let’s assume that our application always expects surnames to be capitalised, and for initials to be indexed into their own field. We also have an IP address that we'd like to convert into human-readable location.

We could achieve this requirement by creating a custom mapping and creating an ingest pipeline. The new Person type can then be used without making any further changes.

First, we’ll create the index and custom mapping:

client.CreateIndex("people", c => c
    .Mappings(ms => ms
        .Map<Person>(p => p
            //automatically create the mapping from the type
            .AutoMap()
            //override any inferred mappings from AutoMap() 
            .Properties(props => props
                // create an additional field to store the initials
                .Keyword(t => t.Name("initials"))
                 //map field as IP Address type
                .Ip(t => t.Name(dv => dv.IpAddress))
                // map GeoIp as object
                .Object<GeoIp>(t => t.Name(dv => dv.GeoIp))
            )
        )
    )
);

Next we’ll create an ingest pipeline, taking advantage of the bundled ingest-geoip plugin, now bundled in version 6.7.

client.PutPipeline("person-pipeline", p => p
    .Processors(ps => ps
        //uppercase the lastname
        .Uppercase<Person>(s => s
            .Field(t => t.LastName)
        )
        // use a painless script to populate the new field
        .Script(s => s
            .Lang("painless") 
            .Source("ctx.initials = ctx.firstName.substring(0,1) + ctx.lastName.substring(0,1)")
        )
        // use ingest-geoip plugin to enrich the GeoIp object from the supplied IP Address
        .GeoIp<Person>(s => s 
            .Field(i => i.IpAddress)
            .TargetField(i => i.GeoIp)
        )
    )
);

Now let’s index a Person instance using this new index and ingest pipeline.

var person = new Person
{
    Id = 1,
    FirstName = "Martijn",
    LastName = "Laarman",
    IpAddress = "139.130.4.5"
};
// index the document using the created pipeline
var indexResponse = client.Index(person, p => p
    .Index("people")
    .Pipeline("person-pipeline")
);

Searching now shows the indexed document with the enriched values.

{
  "took": 5,
  "timed_out": false,
  "_shards": {
    "total": 1,
    "successful": 1,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 1,
    "max_score": 1,
    "hits": [
      {
        "_index": "people",
        "_type": "person",
        "_id": "1",
        "_score": 1,
        "_source": {
          "firstName": "Martijn",
          "lastName": "LAARMAN",
          "initials": "ML",
          "geoIp": {
            "continent_name": "Oceania",
            "region_iso_code": "AU-NSW",
            "city_name": "Sydney",
            "country_iso_code": "AU",
            "region_name": "New South Wales",
            "location": {
              "lon": 151.2167,
              "lat": -33.7333
            }
          },
          "ipAddress": "139.130.4.5",
          "id": 1
        }
      }
    ]
  }
}

When a pipeline is specified, there will be the added overhead of document enrichment when indexing; in the example given above, the execution of the uppercasing and the Painless script.

For large bulk requests, it could be prudent to increase the default indexing timeout to avoid exceptions.

client.Bulk(b => b
    .Index("people")
    .Pipeline("person-pipeline")
    //increases the timeout on Elasticsearch server-side
    .Timeout("5m") 
    .IndexMany<Person>(people)
    .RequestConfiguration(rc => rc
        // increases the HTTP request timeout on the client, before aborting request
        .RequestTimeout(TimeSpan.FromMinutes(5)) 
    )
);

In summary

In this blog post we have covered the simple case of indexing a single document, through to bulk indexing multiple documents with ingest pipelines.

Give it a try in your own cluster, or spin up a 14-day free trial of the Elasticsearch Service on Elastic Cloud. And if you run into any problems or have any questions, reach out on the Discuss forums.

For the full documentation of indexing using the NEST Elasticsearch .NET client please refer to our docs.