Bulk API
editBulk API
editThe Java High Level REST Client provides the Bulk Processor to assist with bulk requests
Bulk Request
editA BulkRequest
can be used to execute multiple index, update and/or delete
operations using a single request.
It requires at least one operation to be added to the Bulk request:
BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts", "doc", "1") .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("posts", "doc", "2") .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("posts", "doc", "3") .source(XContentType.JSON,"field", "baz"));
Creates the |
|
Adds a first |
|
Adds a second |
|
Adds a third |
The Bulk API supports only documents encoded in JSON or SMILE. Providing documents in any other format will result in an error.
And different operation types can be added to the same BulkRequest
:
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "doc", "3")); request.add(new UpdateRequest("posts", "doc", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts", "doc", "4") .source(XContentType.JSON,"field", "baz"));
Adds a |
|
Adds an |
|
Adds an |
Optional arguments
editThe following arguments can optionally be provided:
Timeout to wait for the bulk request to be performed as a |
|
Timeout to wait for the bulk request to be performed as a |
Synchronous Execution
editBulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
Asynchronous Execution
editThe asynchronous execution of a bulk request requires both the BulkRequest
instance and an ActionListener
instance to be passed to the asynchronous
method:
The asynchronous method does not block and returns immediately. Once it is
completed the ActionListener
is called back using the onResponse
method
if the execution successfully completed or using the onFailure
method if
it failed.
A typical listener for BulkResponse
looks like:
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } };
Called when the execution is successfully completed. The response is provided as an argument and contains a list of individual results for each operation that was executed. Note that one or more operations might have failed while the others have been successfully executed. |
|
Called when the whole |
Bulk Response
editThe returned BulkResponse
contains information about the executed operations and
allows to iterate over each result as follows:
for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
Iterate over the results of all operations |
|
Retrieve the response of the operation (successful or not), can be |
|
Handle the response of an index operation |
|
Handle the response of a update operation |
|
Handle the response of a delete operation |
The Bulk response provides a method to quickly check if one or more operation has failed:
In such situation it is necessary to iterate over all operation results in order to check if the operation failed, and if so, retrieve the corresponding failure:
Bulk Processor
editThe BulkProcessor
simplifies the usage of the Bulk API by providing
a utility class that allows index/update/delete operations to be
transparently executed as they are added to the processor.
In order to execute the requests, the BulkProcessor
requires the following
components:
-
RestHighLevelClient
-
This client is used to execute the
BulkRequest
and to retrieve theBulkResponse
-
BulkProcessor.Listener
-
This listener is called before and after
every
BulkRequest
execution or when aBulkRequest
failed
Then the BulkProcessor.builder
method can be used to build a new BulkProcessor
:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { } }; BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build();
Create the |
|
This method is called before each execution of a |
|
This method is called after each execution of a |
|
This method is called when a |
|
Create the |
The BulkProcessor.Builder
provides methods to configure how the BulkProcessor
should handle requests execution:
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> highLevelClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); builder.setBulkActions(500); builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); builder.setConcurrentRequests(0); builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); builder.setBackoffPolicy(BackoffPolicy .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
Set when to flush a new bulk request based on the number of actions currently added (defaults to 1000, use -1 to disable it) |
|
Set when to flush a new bulk request based on the size of actions currently added (defaults to 5Mb, use -1 to disable it) |
|
Set the number of concurrent requests allowed to be executed (default to 1, use 0 to only allow the execution of a single request) |
|
Set a flush interval flushing any |
|
Set a constant back off policy that initially waits for 1 second
and retries up to 3 times. See |
Once the BulkProcessor
is created requests can be added to it:
IndexRequest one = new IndexRequest("posts", "doc", "1"). source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts", "doc", "2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts", "doc", "3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
The requests will be executed by the BulkProcessor
, which takes care of
calling the BulkProcessor.Listener
for every bulk request.
The listener provides methods to access to the BulkRequest
and the BulkResponse
:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); } };
Called before each execution of a |
|
Called after each execution of a |
|
Called if the |
Once all requests have been added to the BulkProcessor
, its instance needs to
be closed using one of the two available closing methods.
The awaitClose()
method can be used to wait until all requests have been processed
or the specified waiting time elapses:
The method returns |
The close()
method can be used to immediately close the BulkProcessor
:
bulkProcessor.close();
Both methods flush the requests added to the processor before closing the processor and also forbid any new request to be added to it.