Use a data streamedit

After you set up a data stream, you can do the following:

Add documents to a data streamedit

You can add documents to a data stream using two types of indexing requests:

Adding a document to a data stream adds the document to stream’s current write index.

You cannot add new documents to a stream’s other backing indices, even by sending requests directly to the index. This means you cannot submit the following requests directly to any backing index except the write index:

  • An index API request with an op_type of create. The op_type parameter defaults to create when adding new documents.
  • A bulk API request using a create action

Individual indexing requestsedit

You can use an index API request with an op_type of create to add individual documents to a data stream.

The op_type parameter defaults to create when adding new documents.

The following index API request adds a new document to my-data-stream.

POST /my-data-stream/_doc/
{
  "@timestamp": "2020-12-07T11:06:07.000Z",
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

You cannot add new documents to a data stream using the index API’s PUT /<target>/_doc/<_id> request format. To specify a document ID, use the PUT /<target>/_create/<_id> format instead.

Bulk indexing requestsedit

You can use the bulk API to add multiple documents to a data stream in a single request. Each action in the bulk request must use the create action.

Data streams do not support other bulk actions, such as index.

The following bulk API request adds several new documents to my-data-stream. Only the create action is used.

PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }

Index with an ingest pipelineedit

You can use an ingest pipeline with an indexing request to pre-process data before it’s indexed to a data stream.

The following put pipeline API request creates the lowercase_message_field ingest pipeline. The pipeline uses the lowercase ingest processor to change the message field value to lowercase before indexing.

PUT /_ingest/pipeline/lowercase_message_field
{
  "description" : "Lowercases the message field value",
  "processors" : [
    {
      "lowercase" : {
        "field" : "message"
      }
    }
  ]
}

The following index API request adds a new document to my-data-stream.

The request includes a ?pipeline=lowercase_message_field query parameter. This parameter indicates Elasticsearch should use the lowercase_message_field pipeline to pre-process the document before indexing it.

During pre-processing, the pipeline changes the letter case of the document’s message field value from LOGIN Successful to login successful.

POST /my-data-stream/_doc?pipeline=lowercase_message_field
{
  "@timestamp": "2020-12-08T11:12:01.000Z",
  "user": {
    "id": "I1YBEOxJ"
  },
  "message": "LOGIN Successful"
}

Search a data streamedit

The following search APIs support data streams:

The following search API request searches my-data-stream for documents with a timestamp between today and yesterday that also have message value of login successful.

GET /my-data-stream/_search
{
  "query": {
    "bool": {
      "must": {
        "range": {
          "@timestamp": {
            "gte": "now-1d/d",
            "lt": "now/d"
          }
        }
      },
      "should": {
        "match": {
          "message": "login successful"
        }
      }
    }
  }
}

You can use a comma-separated list to search multiple data streams, indices, and index aliases in the same request.

The following request searches my-data-stream and my-data-stream-alt, which are specified as a comma-separated list in the request path.

GET /my-data-stream,my-data-stream-alt/_search
{
  "query": {
    "match": {
      "user.id": "8a4f500d"
    }
  }
}

Index patterns are also supported.

The following request uses the my-data-stream* index pattern to search any data stream, index, or index alias beginning with my-data-stream.

GET /my-data-stream*/_search
{
  "query": {
    "match": {
      "user.id": "vlb44hny"
    }
  }
}

The following search request omits a target in the request path. The request searches all data streams and indices in the cluster.

GET /_search
{
  "query": {
    "match": {
      "user.id": "l7gk7f82"
    }
  }
}

Get statistics for a data streamedit

You can use the data stream stats API to retrieve statistics for one or more data streams. These statistics include:

  • A count of the stream’s backing indices
  • The total store size of all shards for the stream’s backing indices
  • The highest @timestamp value for the stream
Example

The following data stream stats API request retrieves statistics for my-data-stream.

GET /_data_stream/my-data-stream/_stats?human=true

The API returns the following response.

{
  "_shards": {
    "total": 6,
    "successful": 3,
    "failed": 0
  },
  "data_stream_count": 1,
  "backing_indices": 3,
  "total_store_size": "624b",
  "total_store_size_bytes": 624,
  "data_streams": [
    {
      "data_stream": "my-data-stream",
      "backing_indices": 3,
      "store_size": "624b",
      "store_size_bytes": 624,
      "maximum_timestamp": 1607339167000
    }
  ]
}

Manually roll over a data streamedit

A rollover creates a new backing index for a data stream. This new backing index becomes the stream’s write index and increments the stream’s generation.

In most cases, we recommend using ILM to automate rollovers for data streams. This lets you automatically roll over the current write index when it meets specified criteria, such as a maximum age or size.

However, you can also use the rollover API to manually perform a rollover. This can be useful if you want to apply mapping or setting changes to the stream’s write index after updating a data stream’s template.

The following rollover API request submits a manual rollover request for my-data-stream.

POST /my-data-stream/_rollover/

Open closed backing indicesedit

You may close one or more of a data stream’s backing indices as part of its ILM lifecycle or another workflow. A closed backing index cannot be searched, even for searches targeting its data stream. You also can’t update or delete documents in a closed index.

You can re-open individual backing indices by sending an open request directly to the index.

You also can conveniently re-open all closed backing indices for a data stream by sending an open request directly to the stream.

The following cat indices API request retrieves the status for my-data-stream's backing indices.

GET /_cat/indices/my-data-stream?v&s=index&h=index,status

The API returns the following response. The response indicates my-data-stream contains two closed backing indices: .ds-my-data-stream-000001 and .ds-my-data-stream-000002.

index                     status
.ds-my-data-stream-000001 close
.ds-my-data-stream-000002 close
.ds-my-data-stream-000003 open

The following open API request re-opens any closed backing indices for my-data-stream, including .ds-my-data-stream-000001 and .ds-my-data-stream-000002.

POST /my-data-stream/_open/

You can resubmit the original cat indices API request to verify .ds-my-data-stream-000001 and .ds-my-data-stream-000002 were re-opened.

GET /_cat/indices/my-data-stream?v&s=index&h=index,status

The API returns the following response.

index                     status
.ds-my-data-stream-000001 open
.ds-my-data-stream-000002 open
.ds-my-data-stream-000003 open

Reindex with a data streamedit

You can use the reindex API to copy documents to a data stream from an existing index, index alias, or data stream.

A reindex copies documents from a source to a destination. The source and destination can be any pre-existing index, index alias, or data stream. However, the source and destination must be different. You cannot reindex a data stream into itself.

Because data streams are append-only, a reindex request to a data stream destination must have an op_type of create. This means a reindex can only add new documents to a data stream. It cannot update existing documents in the data stream destination.

A reindex can be used to:

  • Convert an existing index alias and collection of time-based indices into a data stream.
  • Apply a new or updated index template by reindexing an existing data stream into a new one. This applies mapping and setting changes in the template to each document and backing index of the data stream destination. See Use reindex to change mappings or settings.

If you only want to update the mappings or settings of a data stream’s write index, we recommend you update the data stream’s template and perform a rollover.

The following reindex request copies documents from the archive index alias to my-data-stream. Because the destination is a data stream, the request’s op_type is create.

POST /_reindex
{
  "source": {
    "index": "archive"
  },
  "dest": {
    "index": "my-data-stream",
    "op_type": "create"
  }
}

You can also reindex documents from a data stream to an index, index alias, or data stream.

The following reindex request copies documents from my-data-stream to the existing archive index alias. Because the destination is not a data stream, the op_type does not need to be specified.

POST /_reindex
{
  "source": {
    "index": "my-data-stream"
  },
  "dest": {
    "index": "archive"
  }
}

Update documents in a data stream by queryedit

You cannot send indexing or update requests for existing documents directly to a data stream. These prohibited requests include:

  • An index API request with an op_type of index. The op_type parameter defaults to index for existing documents.
  • A bulk API request using the index or update action.

Instead, you can use the update by query API to update documents in a data stream that matches a provided query.

The following update by query request updates documents in my-data-stream with a user.id of l7gk7f82. The request uses a script to assign matching documents a new user.id value of XgdX0NoX.

POST /my-data-stream/_update_by_query
{
  "query": {
    "match": {
      "user.id": "l7gk7f82"
    }
  },
  "script": {
    "source": "ctx._source.user.id = params.new_id",
    "params": {
      "new_id": "XgdX0NoX"
    }
  }
}

Delete documents in a data stream by queryedit

You cannot send document deletion requests directly to a data stream. These prohibited requests include:

Instead, you can use the delete by query API to delete documents in a data stream that matches a provided query.

The following delete by query request deletes documents in my-data-stream with a user.id of vlb44hny.

POST /my-data-stream/_delete_by_query
{
  "query": {
    "match": {
      "user.id": "vlb44hny"
    }
  }
}

Update or delete documents in a backing indexedit

Alternatively, you can update or delete documents in a data stream by sending the update or deletion request to the backing index containing the document. To do this, you first need to get:

  • The document ID
  • The name of the backing index that contains the document

If you want to update a document, you must also get its current sequence number and primary term.

You can use a search request to retrieve this information.

The following search request retrieves documents in my-data-stream with a user.id of yWIumJd7. By default, this search returns the document ID and backing index for any matching documents.

The request includes a "seq_no_primary_term": true argument. This means the search also returns the sequence number and primary term for any matching documents.

GET /my-data-stream/_search
{
  "seq_no_primary_term": true,
  "query": {
    "match": {
      "user.id": "yWIumJd7"
    }
  }
}

The API returns the following response. The hits.hits property contains information for any documents matching the search.

{
  "took": 20,
  "timed_out": false,
  "_shards": {
    "total": 3,
    "successful": 3,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": {
      "value": 1,
      "relation": "eq"
    },
    "max_score": 0.2876821,
    "hits": [
      {
        "_index": ".ds-my-data-stream-000003",      
        "_id": "bfspvnIBr7VVZlfp2lqX",              
        "_seq_no": 0,                               
        "_primary_term": 1,                         
        "_score": 0.2876821,
        "_source": {
          "@timestamp": "2020-12-07T11:06:07.000Z",
          "user": {
            "id": "yWIumJd7"
          },
          "message": "Login successful"
        }
      }
    ]
  }
}

Backing index containing the matching document

Document ID for the document

Current sequence number for the document

Primary term for the document

You can use an index API request to update an individual document. To prevent an accidental overwrite, this request must include valid if_seq_no and if_primary_term arguments.

The following index API request updates an existing document in my-data-stream. The request targets document ID bfspvnIBr7VVZlfp2lqX in the .ds-my-data-stream-000003 backing index.

The request also includes the current sequence number and primary term in the respective if_seq_no and if_primary_term query parameters. The request body contains a new JSON source for the document.

PUT /.ds-my-data-stream-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
{
  "@timestamp": "2020-12-07T11:06:07.000Z",
  "user": {
    "id": "8a4f500d"
  },
  "message": "Login successful"
}

You use the delete API to delete individual documents. Deletion requests do not require a sequence number or primary term.

The following index API request deletes an existing document in my-data-stream. The request targets document ID bfspvnIBr7VVZlfp2lqX in the .ds-my-data-stream-000003 backing index.

DELETE /.ds-my-data-stream-000003/_doc/bfspvnIBr7VVZlfp2lqX

You can use the bulk API to delete or update multiple documents in one request using delete, index, or update actions.

If the action type is index, the action must include valid if_seq_no and if_primary_term arguments.

The following bulk API request uses an index action to update an existing document in my-data-stream.

The index action targets document ID bfspvnIBr7VVZlfp2lqX in the .ds-my-data-stream-000003 backing index. The action also includes the current sequence number and primary term in the respective if_seq_no and if_primary_term parameters.

PUT /_bulk?refresh
{ "index": { "_index": ".ds-my-data-stream-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
{ "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }