Reindexing documents
editReindexing documents
editSometimes there is a need to reindex documents from one index to another. The client provides two different methods for
reindexing documents, ReindexOnServer
and Reindex
.
Reindex
editThe reindex API of Elasticsearch is exposed as the ReindexOnServer
method (and its asynchronous counterpart, ReindexOnServerAsync
) on
the client. Simple usage is to define a source index and destination index and wait for the operation to complete
The destination index must exist before starting the reindex process
var reindexResponse = client.ReindexOnServer(r => r .Source(s => s .Index("source_index") ) .Destination(d => d .Index("destination_index") ) .WaitForCompletion() );
In the example above, Elasticsearch will wait for the reindex process to complete before returning a response to the client. As such,
ensure that the client is configured with a sufficient request timeout when using WaitForCompletion
.
Instead of waiting for the reindex process to complete, reindex can be run asynchronously on Elasticsearch, returning a task that can be used with the task APIs, to get the status of the process or cancel it. The following example demonstrates this approach
var reindexResponse = client.ReindexOnServer(r => r .Source(s => s .Index("source_index") ) .Destination(d => d .Index("destination_index") ) .WaitForCompletion(false) ); var taskId = reindexResponse.Task; var taskResponse = client.Tasks.GetTask(taskId); while (!taskResponse.Completed) { Thread.Sleep(TimeSpan.FromSeconds(20)); taskResponse = client.Tasks.GetTask(taskId); } var completedReindexResponse = taskResponse.GetResponse<ReindexOnServerResponse>();
Don’t wait for the reindex process to complete before returning a response |
|
Get the task id from the response to use to check its progress |
|
Whilst the task isn’t completed, keep checking |
|
Wait some time before fetching and checking the task again |
|
Get the completed reindex response from the task response and take some action |
Reindex with parameters
editThe reindex API exposes additional parameters to control the reindex process, such as
- A query to run on the source index to reindex only a subset of the documents that match a query
- Selecting only a subset of fields from the source documents to reindex into the destination index
- Running an Ingest pipeline on documents to be indexed into the destination index
The following example demonstrates some of these parameters
var reindexResponse = client.ReindexOnServer(r => r .Source(s => s .Index("source_index") .Query<Person>(q => q .Term(m => m .Field(f => f.FirstName) .Value("Russ") ) ) .Source<Person>(so => so .Field(f => f.FirstName) .Field(f => f.LastName) ) ) .Destination(d => d .Index("destination_index") .Pipeline("my_reindex_pipeline") ) );
Reindex observable
editIn addition to ReindexOnServer
, the client also exposes a Reindex
method that uses the
Observer Design Pattern to set up a reindex operation
and allows observers to be registered to take action during the reindex process.
In contrast to the ReindexOnServer
method, which uses the reindex API of Elasticsearch to perform the reindex process
entirely on the server, the Reindex
method
- retrieves batches of documents over the network from the source index on the server
- allows modifications to be performed on the client side
- makes requests to bulk index the modified documents to the destination index
Such an approach can be more flexible than what is provided by the reindex API, at the cost of many more requests to Elasticsearch and higher network traffic. Both approaches have their usages so you should choose the one that best suits your requirements.
You might be wondering why ReindexOnServer
that uses the reindex API, and Reindex
that uses an observable approach, are called as such.
The Reindex
method existed on the client long before the reindex API existed in Elasticsearch. Since the
APIs are quite different on the client, when the reindex API was introduced, it was decided to name it ReindexOnServer
to not conflict
with the existing method that is still popularly used.
Reindex
builds on top of ScrollAllObservable
and BulkAllObservable
to fetch
documents from, and index documents into Elasticsearch, respectively. The following example demonstrates a simple use of Reindex
var slices = Environment.ProcessorCount; var reindexObserver = client.Reindex<Person>(r => r .ScrollAll("5s", slices, s => s .Search(ss => ss .Index("source_index") ) ) .BulkAll(b => b .Index("destination_index") ) ) .Wait(TimeSpan.FromMinutes(15), response => { // do something with each bulk response e.g. accumulate number of indexed documents });
Number of slices to split each scroll into |
|
How to fetch documents to be reindexed |
|
How to index fetched documents |
|
Wait up to 15 minutes for the reindex process to complete |
An index can be created when using Reindex
. For example, the source index settings can be retrieved and used
as the basis for index settings of the destination index
var getIndexResponse = client.Indices.Get("source_index"); var indexSettings = getIndexResponse.Indices["source_index"]; var lastNameProperty = indexSettings.Mappings.Properties["lastName"]; if (lastNameProperty is TextProperty textProperty) { if (textProperty.Fields == null) textProperty.Fields = new Properties(); textProperty.Fields.Add("keyword", new KeywordProperty()); } var reindexObserver = client.Reindex<Person>(r => r .CreateIndex(c => c .InitializeUsing(indexSettings) ) .ScrollAll("5s", Environment.ProcessorCount, s => s .Search(ss => ss .Index("source_index") ) ) .BulkAll(b => b .Index("destination_index") ) ) .Wait(TimeSpan.FromMinutes(15), response => { // do something with each bulk response e.g. accumulate number of indexed documents });
Get the settings for the source index |
|
Get the mapping for the |
|
If the |
|
Use the index settings to create the destination index |
Reindex
has an overload that accepts a function for how source documents should be mapped to destination documents. In addition,
further control over reindexing can be achieved by using an observer to subscribe to the reindexing process to take some action on
each successful bulk response, when an error occurs, and when the process has finished. The following example demonstrates these
features.
An observer should not throw exceptions from its interface implementations, such
as OnNext
and OnError
. Any exceptions thrown should be expected to go unhandled. In light of this, any exception
that occurs during the reindex process should be captured and thrown outside of the observer, as demonstrated in the
example below. Take a look at the
Observer Design Pattern best practices
on handling exceptions.
var reindexObservable = client.Reindex<Person, Person>( person => person, r => r .ScrollAll("5s", Environment.ProcessorCount, s => s .Search(ss => ss .Index("source_index") ) ) .BulkAll(b => b .Index("destination_index") ) ); var waitHandle = new ManualResetEvent(false); ExceptionDispatchInfo exceptionDispatchInfo = null; var observer = new ReindexObserver( onNext: response => { // do something e.g. write number of pages to console }, onError: exception => { exceptionDispatchInfo = ExceptionDispatchInfo.Capture(exception); waitHandle.Set(); }, onCompleted: () => waitHandle.Set()); reindexObservable.Subscribe(observer); waitHandle.WaitOne(); exceptionDispatchInfo?.Throw();