Bulk indexing
When ingesting many documents, use the Bulk API to send multiple operations in a single request. For the full bulk API reference, see the Bulk API documentation.
With the low-level client, build the NDJSON payload yourself and submit it with Bulk():
client, err := elasticsearch.NewDefaultClient()
if err != nil {
// Handle error.
}
defer func() {
if err := client.Close(context.Background()); err != nil {
// Handle error.
}
}()
var buf bytes.Buffer
buf.WriteString(`{ "index" : { "_index" : "test", "_id" : "1" } }` + "\n")
buf.WriteString(`{ "title" : "Test" }` + "\n")
res, err := client.Bulk(bytes.NewReader(buf.Bytes()))
if err != nil {
// Handle error.
}
defer res.Body.Close()
- Each operation line specifies the action and metadata.
- The document body follows. The final line must end with
\n. - Send the NDJSON payload as an
io.Reader.
With the typed client, you can build a bulk request by appending operations and then executing it:
index := "my-index"
id1 := "1"
id2 := "2"
bulk := es.Bulk()
if err := bulk.IndexOp(
types.IndexOperation{Index_: &index, Id_: &id1},
map[string]any{"title": "Test 1"},
); err != nil {
// Handle error.
}
if err := bulk.IndexOp(
types.IndexOperation{Index_: &index, Id_: &id2},
map[string]any{"title": "Test 2"},
); err != nil {
// Handle error.
}
res, err := bulk.Do(context.Background())
if err != nil {
// Handle error.
}
if res.Errors {
// One or more operations failed.
}
- Create a bulk request builder.
- Append index operations with metadata and document body.
- Execute the bulk request.
The client repository contains complete, runnable examples for bulk ingestion (manual NDJSON, esutil.BulkIndexer, typed bulk, benchmarks, Kafka ingestion): _examples/bulk.
For a higher-level API that takes care of batching, flushing, and concurrency, use the esutil.BulkIndexer helper.
The BulkIndexer is designed to be long-lived: create it once, keep adding items over time (potentially from multiple goroutines), and call Close() once when you are done (for example with defer).
client, err := elasticsearch.NewDefaultClient()
if err != nil {
// Handle error.
}
defer func() {
if err := client.Close(context.Background()); err != nil {
// Handle error.
}
}()
ctx := context.Background()
indexer, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
Client: client,
Index: "test",
NumWorkers: 4,
FlushBytes: 5_000_000,
})
if err != nil {
// Handle error.
}
defer func() {
if err := indexer.Close(ctx); err != nil {
// Handle error.
}
}()
_ = indexer.Add(ctx, esutil.BulkIndexerItem{
Action: "index",
DocumentID: "1",
Body: strings.NewReader(`{"title":"Test"}`),
})
- The Elasticsearch client instance.
- The default index name for all items.
- Number of concurrent worker goroutines.
- Flush threshold in bytes (flush when the buffer reaches this size).
BulkIndexerConfig full reference
The esutil.BulkIndexerConfig struct supports the following fields:
| Field | Type | Description |
|---|---|---|
Client |
*elasticsearch.Client |
The Elasticsearch client (required). |
Index |
string |
Default index name for items that don't specify one. |
NumWorkers |
int |
Number of concurrent worker goroutines (default: number of CPUs). |
FlushBytes |
int |
Flush threshold in bytes. |
FlushInterval |
time.Duration |
Periodic flush interval. |
Pipeline |
string |
Default ingest pipeline for all items. |
Refresh |
string |
Refresh policy after each flush ("true", "false", "wait_for"). |
Routing |
string |
Default routing value for all items. |
Timeout |
time.Duration |
Timeout for each bulk request. |
OnError |
func(context.Context, error) |
Callback invoked when a bulk request fails. |
OnFlushStart |
func(context.Context) context.Context |
Callback invoked before each flush. |
OnFlushEnd |
func(context.Context) |
Callback invoked after each flush. |
Decoder |
BulkResponseJSONDecoder |
Custom JSON decoder for bulk responses. |
DebugLogger |
BulkIndexerDebugLogger |
Logger for debug output. |