Bulk API

edit

The Java High Level REST Client provides the Bulk Processor to assist with bulk requests.

Bulk Request

edit

A BulkRequest can be used to execute multiple index, update and/or delete operations using a single request.

It requires at least one operation to be added to the Bulk request:

BulkRequest request = new BulkRequest(); 
request.add(new IndexRequest("posts").id("1")  
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts").id("2")  
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts").id("3")  
        .source(XContentType.JSON,"field", "baz"));

Creates the BulkRequest

Adds a first IndexRequest to the Bulk request. See Index API for more information on how to build IndexRequest.

Adds a second IndexRequest

Adds a third IndexRequest

The Bulk API supports only documents encoded in JSON or SMILE. Providing documents in any other format will result in an error.

And different operation types can be added to the same BulkRequest:

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3")); 
request.add(new UpdateRequest("posts", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")  
        .source(XContentType.JSON,"field", "baz"));

Adds a DeleteRequest to the BulkRequest. See Delete API for more information on how to build DeleteRequest.

Adds an UpdateRequest to the BulkRequest. See Update API for more information on how to build UpdateRequest.

Adds an IndexRequest using the SMILE format

Optional arguments

edit

The following arguments can optionally be provided:

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 

Timeout to wait for the bulk request to be performed as a TimeValue

Timeout to wait for the bulk request to be performed as a String

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");                            

Refresh policy as a WriteRequest.RefreshPolicy instance

Refresh policy as a String

request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL); 

Sets the number of shard copies that must be active before proceeding with the index/update/delete operations.

Number of shard copies provided as a ActiveShardCount: can be ActiveShardCount.ALL, ActiveShardCount.ONE or ActiveShardCount.DEFAULT (default)

request.pipeline("pipelineId"); 

Global pipelineId used on all sub requests, unless overridden on a sub request

request.routing("routingId"); 

Global routingId used on all sub requests, unless overridden on a sub request

BulkRequest defaulted = new BulkRequest("posts"); 

A bulk request with a global index used on all sub requests, unless overridden on a sub request. This parameter is @Nullable and can only be set during BulkRequest creation.

Synchronous execution

edit

When executing a BulkRequest in the following manner, the client waits for the BulkResponse to be returned before continuing with code execution:

BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

Synchronous calls may throw an IOException in case of either failing to parse the REST response in the high-level REST client, the request times out or similar cases where there is no response coming back from the server.

In cases where the server returns a 4xx or 5xx error code, the high-level client tries to parse the response body error details instead and then throws a generic ElasticsearchException and adds the original ResponseException as a suppressed exception to it.

Asynchronous execution

edit

Executing a BulkRequest can also be done in an asynchronous fashion so that the client can return directly. Users need to specify how the response or potential failures will be handled by passing the request and a listener to the asynchronous bulk method:

client.bulkAsync(request, RequestOptions.DEFAULT, listener); 

The BulkRequest to execute and the ActionListener to use when the execution completes

The asynchronous method does not block and returns immediately. Once it is completed the ActionListener is called back using the onResponse method if the execution successfully completed or using the onFailure method if it failed. Failure scenarios and expected exceptions are the same as in the synchronous execution case.

A typical listener for bulk looks like:

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};

Called when the execution is successfully completed.

Called when the whole BulkRequest fails.

Bulk Response

edit

The returned BulkResponse contains information about the executed operations and allows to iterate over each result as follows:

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    switch (bulkItemResponse.getOpType()) {
    case INDEX:    
    case CREATE:
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        break;
    case UPDATE:   
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
        break;
    case DELETE:   
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

Iterate over the results of all operations

Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances

Handle the response of an index operation

Handle the response of a update operation

Handle the response of a delete operation

The Bulk response provides a method to quickly check if one or more operation has failed:

if (bulkResponse.hasFailures()) { 

}

This method returns true if at least one operation failed

In such situation it is necessary to iterate over all operation results in order to check if the operation failed, and if so, retrieve the corresponding failure:

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { 
        BulkItemResponse.Failure failure =
                bulkItemResponse.getFailure(); 
    }
}

Indicate if a given operation failed

Retrieve the failure of the failed operation

Bulk Processor

edit

The BulkProcessor simplifies the usage of the Bulk API by providing a utility class that allows index/update/delete operations to be transparently executed as they are added to the processor.

In order to execute the requests, the BulkProcessor requires the following components:

RestHighLevelClient
This client is used to execute the BulkRequest and to retrieve the BulkResponse
BulkProcessor.Listener
This listener is called before and after every BulkRequest execution or when a BulkRequest failed

Then the BulkProcessor.builder method can be used to build a new BulkProcessor:

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        
    }
};

BulkProcessor bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener).build(); 

Create the BulkProcessor.Listener

This method is called before each execution of a BulkRequest

This method is called after each execution of a BulkRequest

This method is called when a BulkRequest failed

Create the BulkProcessor by calling the build() method from the BulkProcessor.Builder. The RestHighLevelClient.bulkAsync() method will be used to execute the BulkRequest under the hood.

The BulkProcessor.Builder provides methods to configure how the BulkProcessor should handle requests execution:

BulkProcessor.Builder builder = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener);
builder.setBulkActions(500); 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0); 
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); 
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 

Set when to flush a new bulk request based on the number of actions currently added (defaults to 1000, use -1 to disable it)

Set when to flush a new bulk request based on the size of actions currently added (defaults to 5Mb, use -1 to disable it)

Set the number of concurrent requests allowed to be executed (default to 1, use 0 to only allow the execution of a single request)

Set a flush interval flushing any BulkRequest pending if the interval passes (defaults to not set)

Set a constant back off policy that initially waits for 1 second and retries up to 3 times. See BackoffPolicy.noBackoff(), BackoffPolicy.constantBackoff() and BackoffPolicy.exponentialBackoff() for more options.

Once the BulkProcessor is created requests can be added to it:

IndexRequest one = new IndexRequest("posts").id("1")
        .source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

The requests will be executed by the BulkProcessor, which takes care of calling the BulkProcessor.Listener for every bulk request.

The listener provides methods to access to the BulkRequest and the BulkResponse:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); 
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        if (response.hasFailures()) { 
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        logger.error("Failed to execute bulk", failure); 
    }
};

Called before each execution of a BulkRequest, this method allows to know the number of operations that are going to be executed within the BulkRequest

Called after each execution of a BulkRequest, this method allows to know if the BulkResponse contains errors

Called if the BulkRequest failed, this method allows to know the failure

Once all requests have been added to the BulkProcessor, its instance needs to be closed using one of the two available closing methods.

The awaitClose() method can be used to wait until all requests have been processed or the specified waiting time elapses:

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); 

The method returns true if all bulk requests completed and false if the waiting time elapsed before all the bulk requests completed

The close() method can be used to immediately close the BulkProcessor:

bulkProcessor.close();

Both methods flush the requests added to the processor before closing the processor and also forbid any new request to be added to it.