Reindexing documents

edit

Sometimes there is a need to reindex documents from one index to another. The client provides two different methods for reindexing documents, ReindexOnServer and Reindex.

Reindex

edit

The reindex API of Elasticsearch is exposed as the ReindexOnServer method (and its asynchronous counterpart, ReindexOnServerAsync) on the client. Simple usage is to define a source index and destination index and wait for the operation to complete

The destination index must exist before starting the reindex process

var reindexResponse = client.ReindexOnServer(r => r
    .Source(s => s
        .Index("source_index")
    )
    .Destination(d => d
        .Index("destination_index")
    )
    .WaitForCompletion() 
);

Wait for the reindex process to complete before returning a response.

In the example above, Elasticsearch will wait for the reindex process to complete before returning a response to the client. As such, ensure that the client is configured with a sufficient request timeout when using WaitForCompletion.

Instead of waiting for the reindex process to complete, reindex can be run asynchronously on Elasticsearch, returning a task that can be used with the task APIs, to get the status of the process or cancel it. The following example demonstrates this approach

var reindexResponse = client.ReindexOnServer(r => r
        .Source(s => s
            .Index("source_index")
        )
        .Destination(d => d
            .Index("destination_index")
        )
        .WaitForCompletion(false) 
);

var taskId = reindexResponse.Task; 
var taskResponse = client.Tasks.GetTask(taskId);

while (!taskResponse.Completed) 
{
    Thread.Sleep(TimeSpan.FromSeconds(20)); 
    taskResponse = client.Tasks.GetTask(taskId);
}

var completedReindexResponse = taskResponse.GetResponse<ReindexOnServerResponse>(); 

Don’t wait for the reindex process to complete before returning a response

Get the task id from the response to use to check its progress

Whilst the task isn’t completed, keep checking

Wait some time before fetching and checking the task again

Get the completed reindex response from the task response and take some action

Reindex with parameters

edit

The reindex API exposes additional parameters to control the reindex process, such as

  • A query to run on the source index to reindex only a subset of the documents that match a query
  • Selecting only a subset of fields from the source documents to reindex into the destination index
  • Running an Ingest pipeline on documents to be indexed into the destination index

The following example demonstrates some of these parameters

var reindexResponse = client.ReindexOnServer(r => r
        .Source(s => s
            .Index("source_index")
            .Query<Person>(q => q 
                .Term(m => m
                    .Field(f => f.FirstName)
                    .Value("Russ")
                )
            )
            .Source<Person>(so => so 
                .Field(f => f.FirstName)
                .Field(f => f.LastName)
            )
        )
        .Destination(d => d
            .Index("destination_index")
            .Pipeline("my_reindex_pipeline") 
        )
);

Select only a subset of documents to reindex, from the source index

Reindex only the first name and last name fields

Run an ingest pipeline on documents when they are indexed into the destination index

Reindex observable

edit

In addition to ReindexOnServer, the client also exposes a Reindex method that uses the Observer Design Pattern to set up a reindex operation and allows observers to be registered to take action during the reindex process.

In contrast to the ReindexOnServer method, which uses the reindex API of Elasticsearch to perform the reindex process entirely on the server, the Reindex method

  1. retrieves batches of documents over the network from the source index on the server
  2. allows modifications to be performed on the client side
  3. makes requests to bulk index the modified documents to the destination index

Such an approach can be more flexible than what is provided by the reindex API, at the cost of many more requests to Elasticsearch and higher network traffic. Both approaches have their usages so you should choose the one that best suits your requirements.

You might be wondering why ReindexOnServer that uses the reindex API, and Reindex that uses an observable approach, are called as such. The Reindex method existed on the client long before the reindex API existed in Elasticsearch. Since the APIs are quite different on the client, when the reindex API was introduced, it was decided to name it ReindexOnServer to not conflict with the existing method that is still popularly used.

Reindex builds on top of ScrollAllObservable and BulkAllObservable to fetch documents from, and index documents into Elasticsearch, respectively. The following example demonstrates a simple use of Reindex

var slices = Environment.ProcessorCount; 

var reindexObserver = client.Reindex<Person>(r => r
    .ScrollAll("5s", slices, s => s 
        .Search(ss => ss
            .Index("source_index")
        )
    )
    .BulkAll(b => b 
        .Index("destination_index")
    )
)
.Wait(TimeSpan.FromMinutes(15), response => 
{
    // do something with each bulk response e.g. accumulate number of indexed documents
});

Number of slices to split each scroll into

How to fetch documents to be reindexed

How to index fetched documents

Wait up to 15 minutes for the reindex process to complete

An index can be created when using Reindex. For example, the source index settings can be retrieved and used as the basis for index settings of the destination index

var getIndexResponse = client.Indices.Get("source_index"); 
var indexSettings = getIndexResponse.Indices["source_index"];

var lastNameProperty = indexSettings.Mappings.Properties["lastName"]; 

if (lastNameProperty is TextProperty textProperty) 
{
    if (textProperty.Fields == null)
        textProperty.Fields = new Properties();

    textProperty.Fields.Add("keyword", new KeywordProperty());
}

var reindexObserver = client.Reindex<Person>(r => r
    .CreateIndex(c => c
        .InitializeUsing(indexSettings) 
    )
    .ScrollAll("5s", Environment.ProcessorCount, s => s
        .Search(ss => ss
            .Index("source_index")
        )
    )
    .BulkAll(b => b
        .Index("destination_index")
    )
)
.Wait(TimeSpan.FromMinutes(15), response =>
{
    // do something with each bulk response e.g. accumulate number of indexed documents
});

Get the settings for the source index

Get the mapping for the lastName property

If the lastName property is a text datatype, add a keyword multi-field

Use the index settings to create the destination index

Reindex has an overload that accepts a function for how source documents should be mapped to destination documents. In addition, further control over reindexing can be achieved by using an observer to subscribe to the reindexing process to take some action on each successful bulk response, when an error occurs, and when the process has finished. The following example demonstrates these features.

An observer should not throw exceptions from its interface implementations, such as OnNext and OnError. Any exceptions thrown should be expected to go unhandled. In light of this, any exception that occurs during the reindex process should be captured and thrown outside of the observer, as demonstrated in the example below. Take a look at the Observer Design Pattern best practices on handling exceptions.

var reindexObservable = client.Reindex<Person, Person>(
    person => person, 
    r => r
    .ScrollAll("5s", Environment.ProcessorCount, s => s
        .Search(ss => ss
            .Index("source_index")
        )
    )
    .BulkAll(b => b
        .Index("destination_index")
    )
);

var waitHandle = new ManualResetEvent(false);
ExceptionDispatchInfo exceptionDispatchInfo = null;

var observer = new ReindexObserver(
    onNext: response =>
    {
        // do something e.g. write number of pages to console
    },
    onError: exception =>
    {
        exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception);
        waitHandle.Set();
    },
    onCompleted: () => waitHandle.Set());

reindexObservable.Subscribe(observer); 

waitHandle.WaitOne(); 

exceptionDispatchInfo?.Throw(); 

a function to define how source documents are mapped to destination documents

Subscribe to the observable, which will initiate the reindex process

Block the current thread until a signal is received

If an exception was captured during the reindex process, throw it