Bulk APIedit

Note

The Java High Level REST Client provides the Bulk Processor to assist with bulk requests

Bulk Requestedit

A BulkRequest can be used to execute multiple index, update and/or delete operations using a single request.

It requires at least one operation to be added to the Bulk request:

BulkRequest request = new BulkRequest(); 
request.add(new IndexRequest("posts", "doc", "1")  
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")  
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")  
        .source(XContentType.JSON,"field", "baz"));

Creates the BulkRequest

Adds a first IndexRequest to the Bulk request. See Index API for more information on how to build IndexRequest.

Adds a second IndexRequest

Adds a third IndexRequest

Warning

The Bulk API supports only documents encoded in JSON or SMILE. Providing documents in any other format will result in an error.

And different operation types can be added to the same BulkRequest:

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3")); 
request.add(new UpdateRequest("posts", "doc", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")  
        .source(XContentType.JSON,"field", "baz"));

Adds a DeleteRequest to the BulkRequest. See Delete API for more information on how to build DeleteRequest.

Adds an UpdateRequest to the BulkRequest. See Update API for more information on how to build UpdateRequest.

Adds an IndexRequest using the SMILE format

Optional argumentsedit

The following arguments can optionally be provided:

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m"); 

Timeout to wait for the bulk request to be performed as a TimeValue

Timeout to wait for the bulk request to be performed as a String

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");                            

Refresh policy as a WriteRequest.RefreshPolicy instance

Refresh policy as a String

request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL); 

Sets the number of shard copies that must be active before proceeding with the index/update/delete operations.

Number of shard copies provided as a ActiveShardCount: can be ActiveShardCount.ALL, ActiveShardCount.ONE or ActiveShardCount.DEFAULT (default)

Synchronous Executionedit

BulkResponse bulkResponse = client.bulk(request);

Asynchronous Executionedit

client.bulkAsync(request, new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
});

Called when the execution is successfully completed. The response is provided as an argument and contains a list of individual results for each operation that was executed. Note that one or more operations might have failed while the others have been successfully executed.

Called when the whole BulkRequest fails. In this case the raised exception is provided as an argument and no operation has been executed.

Bulk Responseedit

The returned BulkResponse contains information about the executed operations and allows to iterate over each result as follows:

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { 
        IndexResponse indexResponse = (IndexResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { 
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { 
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

Iterate over the results of all operations

Retrieve the response of the operation (successful or not), can be IndexResponse, UpdateResponse or DeleteResponse which can all be seen as DocWriteResponse instances

Handle the response of an index operation

Handle the response of a update operation

Handle the response of a delete operation

The Bulk response provides a method to quickly check if one or more operation has failed:

if (bulkResponse.hasFailures()) { 

}

This method returns true if at least one operation failed

In such situation it is necessary to iterate over all operation results in order to check if the operation failed, and if so, retrieve the corresponding failure:

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { 
        BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); 

    }
}

Indicate if a given operation failed

Retrieve the failure of the failed operation

Bulk Processoredit

The BulkProcessor simplifies the usage of the Bulk API by providing a utility class that allows index/update/delete operations to be transparently executed as they are added to the processor.

In order to execute the requests, the BulkProcessor requires 3 components:

RestHighLevelClient
This client is used to execute the BulkRequest and to retrieve the BulkResponse
BulkProcessor.Listener
This listener is called before and after every BulkRequest execution or when a BulkRequest failed
ThreadPool
The BulkRequest executions are done using threads from this pool, allowing the BulkProcessor to work in a non-blocking manner and to accept new index/update/delete requests while bulk requests are executing.

Then the BulkProcessor.Builder class can be used to build a new BulkProcessor:

ThreadPool threadPool = new ThreadPool(settings); 

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        
    }
};

BulkProcessor bulkProcessor = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool)
        .build(); 

Create the ThreadPool using the given Settings

Create the BulkProcessor.Listener

This method is called before each execution of a BulkRequest

This method is called after each execution of a BulkRequest

This method is called when a BulkRequest failed

Create the BulkProcessor by calling the build() method from the BulkProcessor.Builder. The RestHighLevelClient.bulkAsync() method will be used to execute the BulkRequest under the hood.

The BulkProcessor.Builder provides methods to configure how the BulkProcessor should handle requests execution:

BulkProcessor.Builder builder = new BulkProcessor.Builder(client::bulkAsync, listener, threadPool);
builder.setBulkActions(500); 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0); 
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); 
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); 

Set when to flush a new bulk request based on the number of actions currently added (defaults to 1000, use -1 to disable it)

Set when to flush a new bulk request based on the size of actions currently added (defaults to 5Mb, use -1 to disable it)

Set the number of concurrent requests allowed to be executed (default to 1, use 0 to only allow the execution of a single request)

Set a flush interval flushing any BulkRequest pending if the interval passes (defaults to not set)

Set a constant back off policy that initially waits for 1 second and retries up to 3 times. See BackoffPolicy.noBackoff(), BackoffPolicy.constantBackoff() and BackoffPolicy.exponentialBackoff() for more options.

Once the BulkProcessor is created requests can be added to it:

IndexRequest one = new IndexRequest("posts", "doc", "1").
        source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
        .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
        .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

The requests will be executed by the BulkProcessor, which takes care of calling the BulkProcessor.Listener for every bulk request.

The listener provides methods to access to the BulkRequest and the BulkResponse:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); 
        logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        if (response.hasFailures()) { 
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.error("Failed to execute bulk", failure); 
    }
};

Called before each execution of a BulkRequest, this method allows to know the number of operations that are going to be executed within the BulkRequest

Called after each execution of a BulkRequest, this method allows to know if the BulkResponse contains errors

Called if the BulkRequest failed, this method allows to know the failure

Once all requests have been added to the BulkProcessor, its instance needs to be closed closed using one of the two available closing methods.

The awaitClose() method can be used to wait until all requests have been processed or the specified waiting time elapses:

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS); 

The method returns true if all bulk requests completed and false if the waiting time elapsed before all the bulk requests completed

The close() method can be used to immediately close the BulkProcessor:

bulkProcessor.close();

Both methods flush the requests added to the processor before closing the processor and also forbid any new request to be added to it.