Bulk API
editBulk API
editThe Java High Level REST Client provides the Bulk Processor to assist with bulk requests.
Bulk Request
editA BulkRequest
can be used to execute multiple index, update and/or delete
operations using a single request.
It requires at least one operation to be added to the Bulk request:
BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts").id("1") .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("posts").id("2") .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("posts").id("3") .source(XContentType.JSON,"field", "baz"));
Creates the |
|
Adds a first |
|
Adds a second |
|
Adds a third |
The Bulk API supports only documents encoded in JSON or SMILE. Providing documents in any other format will result in an error.
And different operation types can be added to the same BulkRequest
:
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "3")); request.add(new UpdateRequest("posts", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts").id("4") .source(XContentType.JSON,"field", "baz"));
Adds a |
|
Adds an |
|
Adds an |
Optional arguments
editThe following arguments can optionally be provided:
Timeout to wait for the bulk request to be performed as a |
|
Timeout to wait for the bulk request to be performed as a |
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
Sets the number of shard copies that must be active before proceeding with the index/update/delete operations. |
|
Number of shard copies provided as a |
Synchronous execution
editWhen executing a BulkRequest
in the following manner, the client waits
for the BulkResponse
to be returned before continuing with code execution:
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
Synchronous calls may throw an IOException
in case of either failing to
parse the REST response in the high-level REST client, the request times out
or similar cases where there is no response coming back from the server.
In cases where the server returns a 4xx
or 5xx
error code, the high-level
client tries to parse the response body error details instead and then throws
a generic ElasticsearchException
and adds the original ResponseException
as a
suppressed exception to it.
Asynchronous execution
editExecuting a BulkRequest
can also be done in an asynchronous fashion so that
the client can return directly. Users need to specify how the response or
potential failures will be handled by passing the request and a listener to the
asynchronous bulk method:
The asynchronous method does not block and returns immediately. Once it is
completed the ActionListener
is called back using the onResponse
method
if the execution successfully completed or using the onFailure
method if
it failed. Failure scenarios and expected exceptions are the same as in the
synchronous execution case.
A typical listener for bulk
looks like:
Bulk Response
editThe returned BulkResponse
contains information about the executed operations and
allows to iterate over each result as follows:
for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); switch (bulkItemResponse.getOpType()) { case INDEX: case CREATE: IndexResponse indexResponse = (IndexResponse) itemResponse; break; case UPDATE: UpdateResponse updateResponse = (UpdateResponse) itemResponse; break; case DELETE: DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
Iterate over the results of all operations |
|
Retrieve the response of the operation (successful or not), can be
|
|
Handle the response of an index operation |
|
Handle the response of a update operation |
|
Handle the response of a delete operation |
The Bulk response provides a method to quickly check if one or more operation has failed:
In such situation it is necessary to iterate over all operation results in order to check if the operation failed, and if so, retrieve the corresponding failure:
Bulk Processor
editThe BulkProcessor
simplifies the usage of the Bulk API by providing
a utility class that allows index/update/delete operations to be
transparently executed as they are added to the processor.
In order to execute the requests, the BulkProcessor
requires the following
components:
-
RestHighLevelClient
-
This client is used to execute the
BulkRequest
and to retrieve theBulkResponse
-
BulkProcessor.Listener
-
This listener is called before and after
every
BulkRequest
execution or when aBulkRequest
failed
Then the BulkProcessor.builder
method can be used to build a new
BulkProcessor
:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { } }; BulkProcessor bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener).build();
Create the |
|
This method is called before each execution of a |
|
This method is called after each execution of a |
|
This method is called when a |
|
Create the |
The BulkProcessor.Builder
provides methods to configure how the
BulkProcessor
should handle requests execution:
BulkProcessor.Builder builder = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); builder.setBulkActions(500); builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); builder.setConcurrentRequests(0); builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); builder.setBackoffPolicy(BackoffPolicy .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
Set when to flush a new bulk request based on the number of actions currently added (defaults to 1000, use -1 to disable it) |
|
Set when to flush a new bulk request based on the size of actions currently added (defaults to 5Mb, use -1 to disable it) |
|
Set the number of concurrent requests allowed to be executed (default to 1, use 0 to only allow the execution of a single request) |
|
Set a flush interval flushing any |
|
Set a constant back off policy that initially waits for 1 second
and retries up to 3 times. See |
Once the BulkProcessor
is created requests can be added to it:
IndexRequest one = new IndexRequest("posts").id("1") .source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts").id("2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts").id("3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
The requests will be executed by the BulkProcessor
, which takes care of
calling the BulkProcessor.Listener
for every bulk request.
The listener provides methods to access to the BulkRequest
and the BulkResponse
:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); } };
Called before each execution of a |
|
Called after each execution of a |
|
Called if the |
Once all requests have been added to the BulkProcessor
, its instance needs to
be closed using one of the two available closing methods.
The awaitClose()
method can be used to wait until all requests have been
processed or the specified waiting time elapses:
The method returns |
The close()
method can be used to immediately close the BulkProcessor
:
bulkProcessor.close();
Both methods flush the requests added to the processor before closing the processor and also forbid any new request to be added to it.