Reindex API

edit

Reindex Request

edit

A ReindexRequest can be used to copy documents from one or more indexes into a destination index.

It requires an existing source index and a target index which may or may not exist pre-request. Reindex does not attempt to set up the destination index. It does not copy the settings of the source index. You should set up the destination index prior to running a _reindex action, including setting up mappings, shard counts, replicas, etc.

The simplest form of a ReindexRequest looks like this:

ReindexRequest request = new ReindexRequest(); 
request.setSourceIndices("source1", "source2"); 
request.setDestIndex("dest");  

Creates the ReindexRequest

Adds a list of sources to copy from

Adds the destination index

The dest element can be configured like the index API to control optimistic concurrency control. Just leaving out versionType (as above) or setting it to internal will cause Elasticsearch to blindly dump documents into the target. Setting versionType to external will cause Elasticsearch to preserve the version from the source, create any documents that are missing, and update any documents that have an older version in the destination index than they do in the source index.

request.setDestVersionType(VersionType.EXTERNAL); 

Set the versionType to EXTERNAL

Setting opType to create will cause _reindex to only create missing documents in the target index. All existing documents will cause a version conflict. The default opType is index.

request.setDestOpType("create"); 

Set the opType to create

By default version conflicts abort the _reindex process but you can just count them instead with:

request.setConflicts("proceed"); 

Set proceed on version conflict

You can limit the documents by adding a query.

request.setSourceQuery(new TermQueryBuilder("user", "kimchy")); 

Only copy documents which have field user set to kimchy

It’s also possible to limit the number of processed documents by setting maxDocs.

request.setMaxDocs(10); 

Only copy 10 documents

By default _reindex uses batches of 1000. You can change the batch size with sourceBatchSize.

request.setSourceBatchSize(100); 

Use batches of 100 documents

Reindex can also use the ingest feature by specifying a pipeline.

request.setDestPipeline("my_pipeline"); 

set pipeline to my_pipeline

If you want a particular set of documents from the source index you’ll need to use sort. If possible, prefer a more selective query to maxDocs and sort.

request.addSortField("field1", SortOrder.DESC); 
request.addSortField("field2", SortOrder.ASC); 

add descending sort to`field1`

add ascending sort to field2

ReindexRequest also supports a script that modifies the document. It allows you to also change the document’s metadata. The following example illustrates that.

request.setScript(
    new Script(
        ScriptType.INLINE, "painless",
        "if (ctx._source.user == 'kimchy') {ctx._source.likes++;}",
        Collections.emptyMap())); 

setScript to increment the likes field on all documents with user kimchy.

ReindexRequest supports reindexing from a remote Elasticsearch cluster. When using a remote cluster the query should be specified inside the RemoteInfo object and not using setSourceQuery. If both the remote info and the source query are set it results in a validation error during the request. The reason for this is that the remote Elasticsearch may not understand queries built by the modern query builders. The remote cluster support works all the way back to Elasticsearch 0.90 and the query language has changed since then. When reaching older versions, it is safer to write the query by hand in JSON.

request.setRemoteInfo(
    new RemoteInfo(
        "http", remoteHost, remotePort, null,
        new BytesArray(new MatchAllQueryBuilder().toString()),
        user, password, Collections.emptyMap(),
        new TimeValue(100, TimeUnit.MILLISECONDS),
        new TimeValue(100, TimeUnit.SECONDS)
    )
); 

set remote elastic cluster

ReindexRequest also helps in automatically parallelizing using sliced-scroll to slice on _uid. Use setSlices to specify the number of slices to use.

request.setSlices(2); 

set number of slices to use

ReindexRequest uses the scroll parameter to control how long it keeps the "search context" alive.

request.setScroll(TimeValue.timeValueMinutes(10)); 

set scroll time

Optional arguments

edit

In addition to the options above the following arguments can optionally be also provided:

request.setTimeout(TimeValue.timeValueMinutes(2)); 

Timeout to wait for the reindex request to be performed as a TimeValue

request.setRefresh(true); 

Refresh index after calling reindex

Synchronous execution

edit

When executing a ReindexRequest in the following manner, the client waits for the BulkByScrollResponse to be returned before continuing with code execution:

BulkByScrollResponse bulkResponse =
        client.reindex(request, RequestOptions.DEFAULT);

Synchronous calls may throw an IOException in case of either failing to parse the REST response in the high-level REST client, the request times out or similar cases where there is no response coming back from the server.

In cases where the server returns a 4xx or 5xx error code, the high-level client tries to parse the response body error details instead and then throws a generic ElasticsearchException and adds the original ResponseException as a suppressed exception to it.

Asynchronous execution

edit

Executing a ReindexRequest can also be done in an asynchronous fashion so that the client can return directly. Users need to specify how the response or potential failures will be handled by passing the request and a listener to the asynchronous reindex method:

client.reindexAsync(request, RequestOptions.DEFAULT, listener); 

The ReindexRequest to execute and the ActionListener to use when the execution completes

The asynchronous method does not block and returns immediately. Once it is completed the ActionListener is called back using the onResponse method if the execution successfully completed or using the onFailure method if it failed. Failure scenarios and expected exceptions are the same as in the synchronous execution case.

A typical listener for reindex looks like:

listener = new ActionListener<BulkByScrollResponse>() {
    @Override
    public void onResponse(BulkByScrollResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

Called when the execution is successfully completed.

Called when the whole ReindexRequest fails.

Reindex task submission

edit

It is also possible to submit a ReindexRequest and not wait for it completion with the use of Task API. This is an equivalent of a REST request with wait_for_completion flag set to false.

ReindexRequest reindexRequest = new ReindexRequest(); 
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);
reindexRequest.setRefresh(true);

TaskSubmissionResponse reindexSubmission = highLevelClient()
    .submitReindexTask(reindexRequest, RequestOptions.DEFAULT); 

String taskId = reindexSubmission.getTask(); 

A ReindexRequest is constructed the same way as for the synchronous method

A submit method returns a TaskSubmissionResponse which contains a task identifier.

The task identifier can be used to get response from a completed task.

Reindex Response

edit

The returned BulkByScrollResponse contains information about the executed operations and allows to iterate over each result as follows:

TimeValue timeTaken = bulkResponse.getTook(); 
boolean timedOut = bulkResponse.isTimedOut(); 
long totalDocs = bulkResponse.getTotal(); 
long updatedDocs = bulkResponse.getUpdated(); 
long createdDocs = bulkResponse.getCreated(); 
long deletedDocs = bulkResponse.getDeleted(); 
long batches = bulkResponse.getBatches(); 
long noops = bulkResponse.getNoops(); 
long versionConflicts = bulkResponse.getVersionConflicts(); 
long bulkRetries = bulkResponse.getBulkRetries(); 
long searchRetries = bulkResponse.getSearchRetries(); 
TimeValue throttledMillis = bulkResponse.getStatus().getThrottled(); 
TimeValue throttledUntilMillis =
        bulkResponse.getStatus().getThrottledUntil(); 
List<ScrollableHitSource.SearchFailure> searchFailures =
        bulkResponse.getSearchFailures(); 
List<BulkItemResponse.Failure> bulkFailures =
        bulkResponse.getBulkFailures(); 

Get total time taken

Check if the request timed out

Get total number of docs processed

Number of docs that were updated

Number of docs that were created

Number of docs that were deleted

Number of batches that were executed

Number of skipped docs

Number of version conflicts

Number of times request had to retry bulk index operations

Number of times request had to retry search operations

The total time this request has throttled itself not including the current throttle time if it is currently sleeping

Remaining delay of any current throttle sleep or 0 if not sleeping

Failures during search phase

Failures during bulk index operation