Bulk: indexing multiple documents

edit

Bulk requests allow sending multiple document-related operations to Elasticsearch in one request. When you have multiple documents to ingest, this is more efficient than sending each document with a separate request.

A bulk request can contain several kinds of operations:

  • create a document, indexing it after ensuring it doesn’t already exist,
  • index a document, creating it if needed and replacing it if it exists,
  • update a document that already exists in place, either with a script or a partial document,
  • delete a document.

See the Elasticsearch API documentation for a full explanation of bulk requests.

Indexing application objects

edit

A BulkRequest contains a collection of operations, each operation being a type with several variants. To create this request, it is convenient to use a builder object for the main request, and the fluent DSL for each operation.

The example below shows how to index a list or application objects.

List<Product> products = fetchProducts();

BulkRequest.Builder br = new BulkRequest.Builder();

for (Product product : products) {
    br.operations(op -> op           
        .index(idx -> idx            
            .index("products")       
            .id(product.getSku())
            .document(product)
        )
    );
}

BulkResponse result = esClient.bulk(br.build());

// Log errors, if any
if (result.errors()) {
    logger.error("Bulk had errors");
    for (BulkResponseItem item: result.items()) {
        if (item.error() != null) {
            logger.error(item.error().reason());
        }
    }
}

Adds an operation (remember that list properties are additive). op is is a builder for BulkOperation which is a variant type. This type has index, create, update and delete variants.

Selects the index operation variant, idx is a builder for IndexOperation.

Sets the properties for the index operation, similar to single document indexing: index name, identifier and document.

Indexing raw JSON data

edit

The document property of a bulk index request can be any object that can be serialized to JSON using your Elasticsearch client’s JSON mapper. In the example below we will use the Java API Client’s JsonData object to read json files from a log directory and send them in a bulk request.

Since JsonData doesn’t allow reading directly from an input stream (this will be added in a future release), we will use the following function for that:

public static JsonData readJson(InputStream input, ElasticsearchClient esClient) {
    JsonpMapper jsonpMapper = esClient._transport().jsonpMapper();
    JsonProvider jsonProvider = jsonpMapper.jsonProvider();

    return JsonData.from(jsonProvider.createParser(input), jsonpMapper);
}

We can now read the contents of the log directory and send it to Elasticsearch:

// List json log files in the log directory
File[] logFiles = logDir.listFiles(
    file -> file.getName().matches("log-.*\\.json")
);

BulkRequest.Builder br = new BulkRequest.Builder();

for (File file: logFiles) {
    JsonData json = readJson(new FileInputStream(file), esClient);

    br.operations(op -> op
        .index(idx -> idx
            .index("logs")
            .document(json)
        )
    );
}

The source code for the examples above can be found in the Java API Client tests.