Update By Query APIedit

Updates documents that match the specified query. If no query is specified, performs an update on every document in the data stream or index without modifying the source, which is useful for picking up mapping changes.

resp = client.update_by_query(
    index="my-index-000001",
    conflicts="proceed",
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  conflicts: 'proceed'
)
puts response
POST my-index-000001/_update_by_query?conflicts=proceed

Requestedit

POST /<target>/_update_by_query

Prerequisitesedit

  • If the Elasticsearch security features are enabled, you must have the following index privileges for the target data stream, index, or alias:

    • read
    • index or write

Descriptionedit

You can specify the query criteria in the request URI or the request body using the same syntax as the Search API.

When you submit an update by query request, Elasticsearch gets a snapshot of the data stream or index when it begins processing the request and updates matching documents using internal versioning. When the versions match, the document is updated and the version number is incremented. If a document changes between the time that the snapshot is taken and the update operation is processed, it results in a version conflict and the operation fails. You can opt to count version conflicts instead of halting and returning by setting conflicts to proceed. Note that if you opt to count version conflicts the operation could attempt to update more documents from the source than max_docs until it has successfully updated max_docs documents, or it has gone through every document in the source query.

Documents with a version equal to 0 cannot be updated using update by query because internal versioning does not support 0 as a valid version number.

While processing an update by query request, Elasticsearch performs multiple search requests sequentially to find all of the matching documents. A bulk update request is performed for each batch of matching documents. Any query or update failures cause the update by query request to fail and the failures are shown in the response. Any update requests that completed successfully still stick, they are not rolled back.

Refreshing shardsedit

Specifying the refresh parameter refreshes all shards once the request completes. This is different than the update API’s refresh parameter, which causes just the shard that received the request to be refreshed. Unlike the update API, it does not support wait_for.

Running update by query asynchronouslyedit

If the request contains wait_for_completion=false, Elasticsearch performs some preflight checks, launches the request, and returns a task you can use to cancel or get the status of the task. Elasticsearch creates a record of this task as a document at .tasks/task/${taskId}.

Waiting for active shardsedit

wait_for_active_shards controls how many copies of a shard must be active before proceeding with the request. See Active shards for details. timeout controls how long each write request waits for unavailable shards to become available. Both work exactly the way they work in the Bulk API. Update by query uses scrolled searches, so you can also specify the scroll parameter to control how long it keeps the search context alive, for example ?scroll=10m. The default is 5 minutes.

Throttling update requestsedit

To control the rate at which update by query issues batches of update operations, you can set requests_per_second to any positive decimal number. This pads each batch with a wait time to throttle the rate. Set requests_per_second to -1 to disable throttling.

Throttling uses a wait time between batches so that the internal scroll requests can be given a timeout that takes the request padding into account. The padding time is the difference between the batch size divided by the requests_per_second and the time spent writing. By default the batch size is 1000, so if requests_per_second is set to 500:

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - write_time = 2 seconds - .5 seconds = 1.5 seconds

Since the batch is issued as a single _bulk request, large batch sizes cause Elasticsearch to create many requests and wait before starting the next set. This is "bursty" instead of "smooth".

Slicingedit

Update by query supports sliced scroll to parallelize the update process. This can improve efficiency and provide a convenient way to break the request down into smaller parts.

Setting slices to auto chooses a reasonable number for most data streams and indices. If you’re slicing manually or otherwise tuning automatic slicing, keep in mind that:

  • Query performance is most efficient when the number of slices is equal to the number of shards in the index or backing index. If that number is large (for example, 500), choose a lower number as too many slices hurts performance. Setting slices higher than the number of shards generally does not improve efficiency and adds overhead.
  • Update performance scales linearly across available resources with the number of slices.

Whether query or update performance dominates the runtime depends on the documents being reindexed and cluster resources.

Path parametersedit

<target>
(Optional, string) Comma-separated list of data streams, indices, and aliases to search. Supports wildcards (*). To search all data streams or indices, omit this parameter or use * or _all.

Query parametersedit

allow_no_indices

(Optional, Boolean) If false, the request returns an error if any wildcard expression, index alias, or _all value targets only missing or closed indices. This behavior applies even if the request targets other open indices. For example, a request targeting foo*,bar* returns an error if an index starts with foo but no index starts with bar.

Defaults to true.

analyzer

(Optional, string) Analyzer to use for the query string.

This parameter can only be used when the q query string parameter is specified.

analyze_wildcard

(Optional, Boolean) If true, wildcard and prefix queries are analyzed. Defaults to false.

This parameter can only be used when the q query string parameter is specified.

conflicts
(Optional, string) What to do if update by query hits version conflicts: abort or proceed. Defaults to abort.
default_operator

(Optional, string) The default operator for query string query: AND or OR. Defaults to OR.

This parameter can only be used when the q query string parameter is specified.

df

(Optional, string) Field to use as default where no field prefix is given in the query string.

This parameter can only be used when the q query string parameter is specified.

expand_wildcards

(Optional, string) Type of index that wildcard patterns can match. If the request can target data streams, this argument determines whether wildcard expressions match hidden data streams. Supports comma-separated values, such as open,hidden. Valid values are:

all
Match any data stream or index, including hidden ones.
open
Match open, non-hidden indices. Also matches any non-hidden data stream.
closed
Match closed, non-hidden indices. Also matches any non-hidden data stream. Data streams cannot be closed.
hidden
Match hidden data streams and hidden indices. Must be combined with open, closed, or both.
none
Wildcard patterns are not accepted.

Defaults to open.

ignore_unavailable
(Optional, Boolean) If false, the request returns an error if it targets a missing or closed index. Defaults to false.
lenient

(Optional, Boolean) If true, format-based query failures (such as providing text to a numeric field) in the query string will be ignored. Defaults to false.

This parameter can only be used when the q query string parameter is specified.

max_docs
(Optional, integer) Maximum number of documents to process. Defaults to all documents. When set to a value less then or equal to scroll_size then a scroll will not be used to retrieve the results for the operation.
pipeline
(Optional, string) ID of the pipeline to use to preprocess incoming documents. If the index has a default ingest pipeline specified, then setting the value to _none disables the default ingest pipeline for this request. If a final pipeline is configured it will always run, regardless of the value of this parameter.
preference
(Optional, string) Specifies the node or shard the operation should be performed on. Random by default.
q
(Optional, string) Query in the Lucene query string syntax.
request_cache
(Optional, Boolean) If true, the request cache is used for this request. Defaults to the index-level setting.
refresh
(Optional, Boolean) If true, Elasticsearch refreshes affected shards to make the operation visible to search. Defaults to false.
requests_per_second
(Optional, integer) The throttle for this request in sub-requests per second. Defaults to -1 (no throttle).
routing
(Optional, string) Custom value used to route operations to a specific shard.
scroll
(Optional, time value) Period to retain the search context for scrolling. See Scroll search results.
scroll_size
(Optional, integer) Size of the scroll request that powers the operation. Defaults to 1000.
search_type

(Optional, string) The type of the search operation. Available options:

  • query_then_fetch
  • dfs_query_then_fetch
search_timeout
(Optional, time units) Explicit timeout for each search request. Defaults to no timeout.
slices
(Optional, integer) The number of slices this task should be divided into. Defaults to 1 meaning the task isn’t sliced into subtasks.
sort
(Optional, string) A comma-separated list of <field>:<direction> pairs.
stats
(Optional, string) Specific tag of the request for logging and statistical purposes.
terminate_after

(Optional, integer) Maximum number of documents to collect for each shard. If a query reaches this limit, Elasticsearch terminates the query early. Elasticsearch collects documents before sorting.

Use with caution. Elasticsearch applies this parameter to each shard handling the request. When possible, let Elasticsearch perform early termination automatically. Avoid specifying this parameter for requests that target data streams with backing indices across multiple data tiers.

timeout

(Optional, time units) Period each update request waits for the following operations:

Defaults to 1m (one minute). This guarantees Elasticsearch waits for at least the timeout before failing. The actual wait time could be longer, particularly when multiple waits occur.

version
(Optional, Boolean) If true, returns the document version as part of a hit.
wait_for_active_shards

(Optional, string) The number of shard copies that must be active before proceeding with the operation. Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1). Default: 1, the primary shard.

See Active shards.

Request bodyedit

query
(Optional, query object) Specifies the documents to update using the Query DSL.

Response bodyedit

took
The number of milliseconds from start to end of the whole operation.
timed_out
This flag is set to true if any of the requests executed during the update by query execution has timed out.
total
The number of documents that were successfully processed.
updated
The number of documents that were successfully updated.
deleted
The number of documents that were successfully deleted.
batches
The number of scroll responses pulled back by the update by query.
version_conflicts
The number of version conflicts that the update by query hit.
noops
The number of documents that were ignored because the script used for the update by query returned a noop value for ctx.op.
retries
The number of retries attempted by update by query. bulk is the number of bulk actions retried, and search is the number of search actions retried.
throttled_millis
Number of milliseconds the request slept to conform to requests_per_second.
requests_per_second
The number of requests per second effectively executed during the update by query.
throttled_until_millis
This field should always be equal to zero in an _update_by_query response. It only has meaning when using the Task API, where it indicates the next time (in milliseconds since epoch) a throttled request will be executed again in order to conform to requests_per_second.
failures
Array of failures if there were any unrecoverable errors during the process. If this is non-empty then the request aborted because of those failures. Update by query is implemented using batches. Any failure causes the entire process to abort, but all failures in the current batch are collected into the array. You can use the conflicts option to prevent reindex from aborting on version conflicts.

Examplesedit

The simplest usage of _update_by_query just performs an update on every document in the data stream or index without changing the source. This is useful to pick up a new property or some other online mapping change.

To update selected documents, specify a query in the request body:

resp = client.update_by_query(
    index="my-index-000001",
    conflicts="proceed",
    body={"query": {"term": {"user.id": "kimchy"}}},
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  conflicts: 'proceed',
  body: {
    query: {
      term: {
        'user.id' => 'kimchy'
      }
    }
  }
)
puts response
POST my-index-000001/_update_by_query?conflicts=proceed
{
  "query": { 
    "term": {
      "user.id": "kimchy"
    }
  }
}

The query must be passed as a value to the query key, in the same way as the Search API. You can also use the q parameter in the same way as the search API.

Update documents in multiple data streams or indices:

resp = client.update_by_query(
    index=["my-index-000001", "my-index-000002"],
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001,my-index-000002'
)
puts response
POST my-index-000001,my-index-000002/_update_by_query

Limit the update by query operation to shards that a particular routing value:

resp = client.update_by_query(
    index="my-index-000001",
    routing="1",
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  routing: 1
)
puts response
POST my-index-000001/_update_by_query?routing=1

By default update by query uses scroll batches of 1000. You can change the batch size with the scroll_size parameter:

resp = client.update_by_query(
    index="my-index-000001",
    scroll_size="100",
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  scroll_size: 100
)
puts response
POST my-index-000001/_update_by_query?scroll_size=100

Update a document using a unique attribute:

resp = client.update_by_query(
    index="my-index-000001",
    body={"query": {"term": {"user.id": "kimchy"}}, "max_docs": 1},
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    query: {
      term: {
        'user.id' => 'kimchy'
      }
    },
    max_docs: 1
  }
)
puts response
POST my-index-000001/_update_by_query
{
  "query": {
    "term": {
      "user.id": "kimchy"
    }
  },
  "max_docs": 1
}

Update the document sourceedit

Update by query supports scripts to update the document source. For example, the following request increments the count field for all documents with a user.id of kimchy in my-index-000001:

resp = client.update_by_query(
    index="my-index-000001",
    body={
        "script": {"source": "ctx._source.count++", "lang": "painless"},
        "query": {"term": {"user.id": "kimchy"}},
    },
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    script: {
      source: 'ctx._source.count++',
      lang: 'painless'
    },
    query: {
      term: {
        'user.id' => 'kimchy'
      }
    }
  }
)
puts response
POST my-index-000001/_update_by_query
{
  "script": {
    "source": "ctx._source.count++",
    "lang": "painless"
  },
  "query": {
    "term": {
      "user.id": "kimchy"
    }
  }
}

Note that conflicts=proceed is not specified in this example. In this case, a version conflict should halt the process so you can handle the failure.

As with the Update API, you can set ctx.op to change the operation that is performed:

noop

Set ctx.op = "noop" if your script decides that it doesn’t have to make any changes. The update by query operation skips updating the document and increments the noop counter.

delete

Set ctx.op = "delete" if your script decides that the document should be deleted. The update by query operation deletes the document and increments the deleted counter.

Update by query only supports update, noop, and delete. Setting ctx.op to anything else is an error. Setting any other field in ctx is an error. This API only enables you to modify the source of matching documents, you cannot move them.

Update documents using an ingest pipelineedit

Update by query can use the Ingest pipelines feature by specifying a pipeline:

resp = client.ingest.put_pipeline(
    id="set-foo",
    body={
        "description": "sets foo",
        "processors": [{"set": {"field": "foo", "value": "bar"}}],
    },
)
print(resp)

resp = client.update_by_query(
    index="my-index-000001",
    pipeline="set-foo",
)
print(resp)
response = client.ingest.put_pipeline(
  id: 'set-foo',
  body: {
    description: 'sets foo',
    processors: [
      {
        set: {
          field: 'foo',
          value: 'bar'
        }
      }
    ]
  }
)
puts response

response = client.update_by_query(
  index: 'my-index-000001',
  pipeline: 'set-foo'
)
puts response
PUT _ingest/pipeline/set-foo
{
  "description" : "sets foo",
  "processors" : [ {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
  } ]
}
POST my-index-000001/_update_by_query?pipeline=set-foo
Get the status of update by query operationsedit

You can fetch the status of all running update by query requests with the Task API:

$response = $client->tasks()->list();
resp = client.tasks.list(
    detailed="true",
    actions="*byquery",
)
print(resp)
response = client.tasks.list(
  detailed: true,
  actions: '*byquery'
)
puts response
res, err := es.Tasks.List(
	es.Tasks.List.WithActions("*byquery"),
	es.Tasks.List.WithDetailed(true),
)
fmt.Println(res, err)
const response = await client.tasks.list({
  detailed: 'true',
  actions: '*byquery'
})
console.log(response)
GET _tasks?detailed=true&actions=*byquery

The responses looks like:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/update/byquery",
          "status" : {    
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            },
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}

This object contains the actual status. It is just like the response JSON with the important addition of the total field. total is the total number of operations that the reindex expects to perform. You can estimate the progress by adding the updated, created, and deleted fields. The request will finish when their sum is equal to the total field.

With the task id you can look up the task directly. The following example retrieves information about task r1A2WoRbTwKZ516z6NEs5A:36619:

$params = [
    'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->tasks()->get($params);
resp = client.tasks.get(
    task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
)
print(resp)
response = client.tasks.get(
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
)
puts response
res, err := es.Tasks.Get(
	"r1A2WoRbTwKZ516z6NEs5A:36619",
)
fmt.Println(res, err)
const response = await client.tasks.get({
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
})
console.log(response)
GET /_tasks/r1A2WoRbTwKZ516z6NEs5A:36619

The advantage of this API is that it integrates with wait_for_completion=false to transparently return the status of completed tasks. If the task is completed and wait_for_completion=false was set on it, then it’ll come back with a results or an error field. The cost of this feature is the document that wait_for_completion=false creates at .tasks/task/${taskId}. It is up to you to delete that document.

Cancel an update by query operationedit

Any update by query can be cancelled using the Task Cancel API:

$params = [
    'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->tasks()->cancel($params);
resp = client.tasks.cancel(
    task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
)
print(resp)
response = client.tasks.cancel(
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
)
puts response
res, err := es.Tasks.Cancel(
	es.Tasks.Cancel.WithTaskID("r1A2WoRbTwKZ516z6NEs5A:36619"),
)
fmt.Println(res, err)
const response = await client.tasks.cancel({
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619'
})
console.log(response)
POST _tasks/r1A2WoRbTwKZ516z6NEs5A:36619/_cancel

The task ID can be found using the tasks API.

Cancellation should happen quickly but might take a few seconds. The task status API above will continue to list the update by query task until this task checks that it has been cancelled and terminates itself.

Change throttling for a requestedit

The value of requests_per_second can be changed on a running update by query using the _rethrottle API:

$params = [
    'task_id' => 'r1A2WoRbTwKZ516z6NEs5A:36619',
];
$response = $client->updateByQueryRethrottle($params);
resp = client.update_by_query_rethrottle(
    task_id="r1A2WoRbTwKZ516z6NEs5A:36619",
    requests_per_second="-1",
)
print(resp)
response = client.update_by_query_rethrottle(
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619',
  requests_per_second: -1
)
puts response
res, err := es.UpdateByQueryRethrottle(
	"r1A2WoRbTwKZ516z6NEs5A:36619",
	esapi.IntPtr(-1),
)
fmt.Println(res, err)
const response = await client.updateByQueryRethrottle({
  task_id: 'r1A2WoRbTwKZ516z6NEs5A:36619',
  requests_per_second: '-1'
})
console.log(response)
POST _update_by_query/r1A2WoRbTwKZ516z6NEs5A:36619/_rethrottle?requests_per_second=-1

The task ID can be found using the tasks API.

Just like when setting it on the _update_by_query API, requests_per_second can be either -1 to disable throttling or any decimal number like 1.7 or 12 to throttle to that level. Rethrottling that speeds up the query takes effect immediately, but rethrotting that slows down the query will take effect after completing the current batch. This prevents scroll timeouts.

Slice manuallyedit

Slice an update by query manually by providing a slice id and total number of slices to each request:

resp = client.update_by_query(
    index="my-index-000001",
    body={
        "slice": {"id": 0, "max": 2},
        "script": {"source": "ctx._source['extra'] = 'test'"},
    },
)
print(resp)

resp = client.update_by_query(
    index="my-index-000001",
    body={
        "slice": {"id": 1, "max": 2},
        "script": {"source": "ctx._source['extra'] = 'test'"},
    },
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    slice: {
      id: 0,
      max: 2
    },
    script: {
      source: "ctx._source['extra'] = 'test'"
    }
  }
)
puts response

response = client.update_by_query(
  index: 'my-index-000001',
  body: {
    slice: {
      id: 1,
      max: 2
    },
    script: {
      source: "ctx._source['extra'] = 'test'"
    }
  }
)
puts response
POST my-index-000001/_update_by_query
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}
POST my-index-000001/_update_by_query
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}

Which you can verify works with:

resp = client.indices.refresh()
print(resp)

resp = client.search(
    index="my-index-000001",
    size="0",
    q="extra:test",
    filter_path="hits.total",
)
print(resp)
response = client.indices.refresh
puts response

response = client.search(
  index: 'my-index-000001',
  size: 0,
  q: 'extra:test',
  filter_path: 'hits.total'
)
puts response
GET _refresh
POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total

Which results in a sensible total like this one:

{
  "hits": {
    "total": {
        "value": 120,
        "relation": "eq"
    }
  }
}
Use automatic slicingedit

You can also let update by query automatically parallelize using Sliced scroll to slice on _id. Use slices to specify the number of slices to use:

resp = client.update_by_query(
    index="my-index-000001",
    refresh=True,
    slices="5",
    body={"script": {"source": "ctx._source['extra'] = 'test'"}},
)
print(resp)
response = client.update_by_query(
  index: 'my-index-000001',
  refresh: true,
  slices: 5,
  body: {
    script: {
      source: "ctx._source['extra'] = 'test'"
    }
  }
)
puts response
POST my-index-000001/_update_by_query?refresh&slices=5
{
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}

Which you also can verify works with:

resp = client.search(
    index="my-index-000001",
    size="0",
    q="extra:test",
    filter_path="hits.total",
)
print(resp)
response = client.search(
  index: 'my-index-000001',
  size: 0,
  q: 'extra:test',
  filter_path: 'hits.total'
)
puts response
POST my-index-000001/_search?size=0&q=extra:test&filter_path=hits.total

Which results in a sensible total like this one:

{
  "hits": {
    "total": {
        "value": 120,
        "relation": "eq"
    }
  }
}

Setting slices to auto will let Elasticsearch choose the number of slices to use. This setting will use one slice per shard, up to a certain limit. If there are multiple source data streams or indices, it will choose the number of slices based on the index or backing index with the smallest number of shards.

Adding slices to _update_by_query just automates the manual process used in the section above, creating sub-requests which means it has some quirks:

  • You can see these requests in the Tasks APIs. These sub-requests are "child" tasks of the task for the request with slices.
  • Fetching the status of the task for the request with slices only contains the status of completed slices.
  • These sub-requests are individually addressable for things like cancellation and rethrottling.
  • Rethrottling the request with slices will rethrottle the unfinished sub-request proportionally.
  • Canceling the request with slices will cancel each sub-request.
  • Due to the nature of slices each sub-request won’t get a perfectly even portion of the documents. All documents will be addressed, but some slices may be larger than others. Expect larger slices to have a more even distribution.
  • Parameters like requests_per_second and max_docs on a request with slices are distributed proportionally to each sub-request. Combine that with the point above about distribution being uneven and you should conclude that using max_docs with slices might not result in exactly max_docs documents being updated.
  • Each sub-request gets a slightly different snapshot of the source data stream or index though these are all taken at approximately the same time.
Pick up a new propertyedit

Say you created an index without dynamic mapping, filled it with data, and then added a mapping value to pick up more fields from the data:

$params = [
    'index' => 'test',
    'body' => [
        'mappings' => [
            'dynamic' => false,
            'properties' => [
                'text' => [
                    'type' => 'text',
                ],
            ],
        ],
    ],
];
$response = $client->indices()->create($params);
$params = [
    'index' => 'test',
    'body' => [
        'text' => 'words words',
        'flag' => 'bar',
    ],
];
$response = $client->index($params);
$params = [
    'index' => 'test',
    'body' => [
        'text' => 'words words',
        'flag' => 'foo',
    ],
];
$response = $client->index($params);
$params = [
    'index' => 'test',
    'body' => [
        'properties' => [
            'text' => [
                'type' => 'text',
            ],
            'flag' => [
                'type' => 'text',
                'analyzer' => 'keyword',
            ],
        ],
    ],
];
$response = $client->indices()->putMapping($params);
resp = client.indices.create(
    index="test",
    body={
        "mappings": {
            "dynamic": False,
            "properties": {"text": {"type": "text"}},
        }
    },
)
print(resp)

resp = client.index(
    index="test",
    refresh=True,
    body={"text": "words words", "flag": "bar"},
)
print(resp)

resp = client.index(
    index="test",
    refresh=True,
    body={"text": "words words", "flag": "foo"},
)
print(resp)

resp = client.indices.put_mapping(
    index="test",
    body={
        "properties": {
            "text": {"type": "text"},
            "flag": {"type": "text", "analyzer": "keyword"},
        }
    },
)
print(resp)
response = client.indices.create(
  index: 'test',
  body: {
    mappings: {
      dynamic: false,
      properties: {
        text: {
          type: 'text'
        }
      }
    }
  }
)
puts response

response = client.index(
  index: 'test',
  refresh: true,
  body: {
    text: 'words words',
    flag: 'bar'
  }
)
puts response

response = client.index(
  index: 'test',
  refresh: true,
  body: {
    text: 'words words',
    flag: 'foo'
  }
)
puts response

response = client.indices.put_mapping(
  index: 'test',
  body: {
    properties: {
      text: {
        type: 'text'
      },
      flag: {
        type: 'text',
        analyzer: 'keyword'
      }
    }
  }
)
puts response
{
	res, err := es.Indices.Create(
		"test",
		es.Indices.Create.WithBody(strings.NewReader(`{
	  "mappings": {
	    "dynamic": false,
	    "properties": {
	      "text": {
	        "type": "text"
	      }
	    }
	  }
	}`)),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Index(
		"test",
		strings.NewReader(`{
	  "text": "words words",
	  "flag": "bar"
	}`),
		es.Index.WithRefresh("true"),
		es.Index.WithPretty(),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Index(
		"test",
		strings.NewReader(`{
	  "text": "words words",
	  "flag": "foo"
	}`),
		es.Index.WithRefresh("true"),
		es.Index.WithPretty(),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Indices.PutMapping(
		[]string{"test"},
		strings.NewReader(`{
	  "properties": {
	    "text": {
	      "type": "text"
	    },
	    "flag": {
	      "type": "text",
	      "analyzer": "keyword"
	    }
	  }
	}`),
	)
	fmt.Println(res, err)
}
const response0 = await client.indices.create({
  index: 'test',
  body: {
    mappings: {
      dynamic: false,
      properties: {
        text: {
          type: 'text'
        }
      }
    }
  }
})
console.log(response0)

const response1 = await client.index({
  index: 'test',
  refresh: true,
  body: {
    text: 'words words',
    flag: 'bar'
  }
})
console.log(response1)

const response2 = await client.index({
  index: 'test',
  refresh: true,
  body: {
    text: 'words words',
    flag: 'foo'
  }
})
console.log(response2)

const response3 = await client.indices.putMapping({
  index: 'test',
  body: {
    properties: {
      text: {
        type: 'text'
      },
      flag: {
        type: 'text',
        analyzer: 'keyword'
      }
    }
  }
})
console.log(response3)
PUT test
{
  "mappings": {
    "dynamic": false,   
    "properties": {
      "text": {"type": "text"}
    }
  }
}

POST test/_doc?refresh
{
  "text": "words words",
  "flag": "bar"
}
POST test/_doc?refresh
{
  "text": "words words",
  "flag": "foo"
}
PUT test/_mapping   
{
  "properties": {
    "text": {"type": "text"},
    "flag": {"type": "text", "analyzer": "keyword"}
  }
}

This means that new fields won’t be indexed, just stored in _source.

This updates the mapping to add the new flag field. To pick up the new field you have to reindex all documents with it.

Searching for the data won’t find anything:

$params = [
    'index' => 'test',
    'body' => [
        'query' => [
            'match' => [
                'flag' => 'foo',
            ],
        ],
    ],
];
$response = $client->search($params);
resp = client.search(
    index="test",
    filter_path="hits.total",
    body={"query": {"match": {"flag": "foo"}}},
)
print(resp)
response = client.search(
  index: 'test',
  filter_path: 'hits.total',
  body: {
    query: {
      match: {
        flag: 'foo'
      }
    }
  }
)
puts response
res, err := es.Search(
	es.Search.WithIndex("test"),
	es.Search.WithBody(strings.NewReader(`{
	  "query": {
	    "match": {
	      "flag": "foo"
	    }
	  }
	}`)),
	es.Search.WithFilterPath("hits.total"),
	es.Search.WithPretty(),
)
fmt.Println(res, err)
const response = await client.search({
  index: 'test',
  filter_path: 'hits.total',
  body: {
    query: {
      match: {
        flag: 'foo'
      }
    }
  }
})
console.log(response)
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total": {
        "value": 0,
        "relation": "eq"
    }
  }
}

But you can issue an _update_by_query request to pick up the new mapping:

$params = [
    'index' => 'test',
];
$response = $client->updateByQuery($params);
$params = [
    'index' => 'test',
    'body' => [
        'query' => [
            'match' => [
                'flag' => 'foo',
            ],
        ],
    ],
];
$response = $client->search($params);
resp = client.update_by_query(
    index="test",
    refresh=True,
    conflicts="proceed",
)
print(resp)

resp = client.search(
    index="test",
    filter_path="hits.total",
    body={"query": {"match": {"flag": "foo"}}},
)
print(resp)
response = client.update_by_query(
  index: 'test',
  refresh: true,
  conflicts: 'proceed'
)
puts response

response = client.search(
  index: 'test',
  filter_path: 'hits.total',
  body: {
    query: {
      match: {
        flag: 'foo'
      }
    }
  }
)
puts response
{
	res, err := es.UpdateByQuery(
		[]string{"test"},
		es.UpdateByQuery.WithConflicts("proceed"),
		es.UpdateByQuery.WithRefresh(true),
	)
	fmt.Println(res, err)
}

{
	res, err := es.Search(
		es.Search.WithIndex("test"),
		es.Search.WithBody(strings.NewReader(`{
	  "query": {
	    "match": {
	      "flag": "foo"
	    }
	  }
	}`)),
		es.Search.WithFilterPath("hits.total"),
		es.Search.WithPretty(),
	)
	fmt.Println(res, err)
}
const response0 = await client.updateByQuery({
  index: 'test',
  refresh: true,
  conflicts: 'proceed'
})
console.log(response0)

const response1 = await client.search({
  index: 'test',
  filter_path: 'hits.total',
  body: {
    query: {
      match: {
        flag: 'foo'
      }
    }
  }
})
console.log(response1)
POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total": {
        "value": 1,
        "relation": "eq"
    }
  }
}

You can do the exact same thing when adding a field to a multifield.