Use a data stream
editUse a data stream
editAfter you set up a data stream, you can do the following:
- Add documents to a data stream
- Search a data stream
- Get statistics for a data stream
- Manually roll over a data stream
- Open closed backing indices
- Reindex with a data stream
- Update documents in a data stream by query
- Delete documents in a data stream by query
- Update or delete documents in a backing index
Add documents to a data stream
editTo add an individual document, use the index API. Ingest pipelines are supported.
resp = client.index(
index="my-data-stream",
document={
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
},
)
print(resp)
response = client.index(
index: 'my-data-stream',
body: {
"@timestamp": '2099-03-08T11:06:07.000Z',
user: {
id: '8a4f500d'
},
message: 'Login successful'
}
)
puts response
const response = await client.index({
index: "my-data-stream",
document: {
"@timestamp": "2099-03-08T11:06:07.000Z",
user: {
id: "8a4f500d",
},
message: "Login successful",
},
});
console.log(response);
POST /my-data-stream/_doc/
{
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
You cannot add new documents to a data stream using the index API’s PUT
/<target>/_doc/<_id> request format. To specify a document ID, use the PUT
/<target>/_create/<_id> format instead. Only an
op_type of create is supported.
To add multiple documents with a single request, use the bulk API.
Only create actions are supported.
resp = client.bulk(
index="my-data-stream",
refresh=True,
operations=[
{
"create": {}
},
{
"@timestamp": "2099-03-08T11:04:05.000Z",
"user": {
"id": "vlb44hny"
},
"message": "Login attempt failed"
},
{
"create": {}
},
{
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
},
{
"create": {}
},
{
"@timestamp": "2099-03-09T11:07:08.000Z",
"user": {
"id": "l7gk7f82"
},
"message": "Logout successful"
}
],
)
print(resp)
response = client.bulk(
index: 'my-data-stream',
refresh: true,
body: [
{
create: {}
},
{
"@timestamp": '2099-03-08T11:04:05.000Z',
user: {
id: 'vlb44hny'
},
message: 'Login attempt failed'
},
{
create: {}
},
{
"@timestamp": '2099-03-08T11:06:07.000Z',
user: {
id: '8a4f500d'
},
message: 'Login successful'
},
{
create: {}
},
{
"@timestamp": '2099-03-09T11:07:08.000Z',
user: {
id: 'l7gk7f82'
},
message: 'Logout successful'
}
]
)
puts response
const response = await client.bulk({
index: "my-data-stream",
refresh: "true",
operations: [
{
create: {},
},
{
"@timestamp": "2099-03-08T11:04:05.000Z",
user: {
id: "vlb44hny",
},
message: "Login attempt failed",
},
{
create: {},
},
{
"@timestamp": "2099-03-08T11:06:07.000Z",
user: {
id: "8a4f500d",
},
message: "Login successful",
},
{
create: {},
},
{
"@timestamp": "2099-03-09T11:07:08.000Z",
user: {
id: "l7gk7f82",
},
message: "Logout successful",
},
],
});
console.log(response);
PUT /my-data-stream/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2099-03-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2099-03-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
Search a data stream
editThe following search APIs support data streams:
Get statistics for a data stream
editUse the data stream stats API to get statistics for one or more data streams:
resp = client.indices.data_streams_stats(
name="my-data-stream",
human=True,
)
print(resp)
response = client.indices.data_streams_stats( name: 'my-data-stream', human: true ) puts response
const response = await client.indices.dataStreamsStats({
name: "my-data-stream",
human: "true",
});
console.log(response);
GET /_data_stream/my-data-stream/_stats?human=true
Manually roll over a data stream
editUse the rollover API to manually roll over a data stream. You have two options when manually rolling over:
-
To immediately trigger a rollover:
resp = client.indices.rollover( alias="my-data-stream", ) print(resp)response = client.indices.rollover( alias: 'my-data-stream' ) puts response
const response = await client.indices.rollover({ alias: "my-data-stream", }); console.log(response);POST /my-data-stream/_rollover/
-
Or to postpone the rollover until the next indexing event occurs:
resp = client.indices.rollover( alias="my-data-stream", lazy=True, ) print(resp)response = client.indices.rollover( alias: 'my-data-stream', lazy: true ) puts response
const response = await client.indices.rollover({ alias: "my-data-stream", lazy: "true", }); console.log(response);POST /my-data-stream/_rollover?lazy
Use the second to avoid having empty backing indices in data streams that do not get updated often.
Open closed backing indices
editYou cannot search a closed backing index, even by searching its data stream. You also cannot update or delete documents in a closed index.
To re-open a closed backing index, submit an open index API request directly to the index:
resp = client.indices.open(
index=".ds-my-data-stream-2099.03.07-000001",
)
print(resp)
response = client.indices.open( index: '.ds-my-data-stream-2099.03.07-000001' ) puts response
const response = await client.indices.open({
index: ".ds-my-data-stream-2099.03.07-000001",
});
console.log(response);
POST /.ds-my-data-stream-2099.03.07-000001/_open/
To re-open all closed backing indices for a data stream, submit an open index API request to the stream:
resp = client.indices.open(
index="my-data-stream",
)
print(resp)
response = client.indices.open( index: 'my-data-stream' ) puts response
const response = await client.indices.open({
index: "my-data-stream",
});
console.log(response);
POST /my-data-stream/_open/
Reindex with a data stream
editUse the reindex API to copy documents from an existing index,
alias, or data stream to a data stream. Because data streams are
append-only, a reindex into a data stream must use
an op_type of create. A reindex cannot update existing documents in a data
stream.
resp = client.reindex(
source={
"index": "archive"
},
dest={
"index": "my-data-stream",
"op_type": "create"
},
)
print(resp)
response = client.reindex(
body: {
source: {
index: 'archive'
},
dest: {
index: 'my-data-stream',
op_type: 'create'
}
}
)
puts response
const response = await client.reindex({
source: {
index: "archive",
},
dest: {
index: "my-data-stream",
op_type: "create",
},
});
console.log(response);
POST /_reindex
{
"source": {
"index": "archive"
},
"dest": {
"index": "my-data-stream",
"op_type": "create"
}
}
Update documents in a data stream by query
editUse the update by query API to update documents in a data stream that match a provided query:
resp = client.update_by_query(
index="my-data-stream",
query={
"match": {
"user.id": "l7gk7f82"
}
},
script={
"source": "ctx._source.user.id = params.new_id",
"params": {
"new_id": "XgdX0NoX"
}
},
)
print(resp)
response = client.update_by_query(
index: 'my-data-stream',
body: {
query: {
match: {
'user.id' => 'l7gk7f82'
}
},
script: {
source: 'ctx._source.user.id = params.new_id',
params: {
new_id: 'XgdX0NoX'
}
}
}
)
puts response
const response = await client.updateByQuery({
index: "my-data-stream",
query: {
match: {
"user.id": "l7gk7f82",
},
},
script: {
source: "ctx._source.user.id = params.new_id",
params: {
new_id: "XgdX0NoX",
},
},
});
console.log(response);
POST /my-data-stream/_update_by_query
{
"query": {
"match": {
"user.id": "l7gk7f82"
}
},
"script": {
"source": "ctx._source.user.id = params.new_id",
"params": {
"new_id": "XgdX0NoX"
}
}
}
Delete documents in a data stream by query
editUse the delete by query API to delete documents in a data stream that match a provided query:
resp = client.delete_by_query(
index="my-data-stream",
query={
"match": {
"user.id": "vlb44hny"
}
},
)
print(resp)
response = client.delete_by_query(
index: 'my-data-stream',
body: {
query: {
match: {
'user.id' => 'vlb44hny'
}
}
}
)
puts response
const response = await client.deleteByQuery({
index: "my-data-stream",
query: {
match: {
"user.id": "vlb44hny",
},
},
});
console.log(response);
POST /my-data-stream/_delete_by_query
{
"query": {
"match": {
"user.id": "vlb44hny"
}
}
}
Update or delete documents in a backing index
editIf needed, you can update or delete documents in a data stream by sending requests to the backing index containing the document. You’ll need:
- The document ID
- The name of the backing index containing the document
- If updating the document, its sequence number and primary term
To get this information, use a search request:
resp = client.search(
index="my-data-stream",
seq_no_primary_term=True,
query={
"match": {
"user.id": "yWIumJd7"
}
},
)
print(resp)
response = client.search(
index: 'my-data-stream',
body: {
seq_no_primary_term: true,
query: {
match: {
'user.id' => 'yWIumJd7'
}
}
}
)
puts response
const response = await client.search({
index: "my-data-stream",
seq_no_primary_term: true,
query: {
match: {
"user.id": "yWIumJd7",
},
},
});
console.log(response);
GET /my-data-stream/_search
{
"seq_no_primary_term": true,
"query": {
"match": {
"user.id": "yWIumJd7"
}
}
}
Response:
{
"took": 20,
"timed_out": false,
"_shards": {
"total": 3,
"successful": 3,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 0.2876821,
"hits": [
{
"_index": ".ds-my-data-stream-2099.03.08-000003",
"_id": "bfspvnIBr7VVZlfp2lqX",
"_seq_no": 0,
"_primary_term": 1,
"_score": 0.2876821,
"_source": {
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "yWIumJd7"
},
"message": "Login successful"
}
}
]
}
}
|
Backing index containing the matching document |
|
|
Document ID for the document |
|
|
Current sequence number for the document |
|
|
Primary term for the document |
To update the document, use an index API request with valid
if_seq_no and if_primary_term arguments:
resp = client.index(
index=".ds-my-data-stream-2099-03-08-000003",
id="bfspvnIBr7VVZlfp2lqX",
if_seq_no="0",
if_primary_term="1",
document={
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
},
)
print(resp)
const response = await client.index({
index: ".ds-my-data-stream-2099-03-08-000003",
id: "bfspvnIBr7VVZlfp2lqX",
if_seq_no: 0,
if_primary_term: 1,
document: {
"@timestamp": "2099-03-08T11:06:07.000Z",
user: {
id: "8a4f500d",
},
message: "Login successful",
},
});
console.log(response);
PUT /.ds-my-data-stream-2099-03-08-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
{
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
To delete the document, use the delete API:
resp = client.delete(
index=".ds-my-data-stream-2099.03.08-000003",
id="bfspvnIBr7VVZlfp2lqX",
)
print(resp)
response = client.delete( index: '.ds-my-data-stream-2099.03.08-000003', id: 'bfspvnIBr7VVZlfp2lqX' ) puts response
const response = await client.delete({
index: ".ds-my-data-stream-2099.03.08-000003",
id: "bfspvnIBr7VVZlfp2lqX",
});
console.log(response);
DELETE /.ds-my-data-stream-2099.03.08-000003/_doc/bfspvnIBr7VVZlfp2lqX
To delete or update multiple documents with a single request, use the
bulk API's delete, index, and update actions. For index
actions, include valid if_seq_no and
if_primary_term arguments.
resp = client.bulk(
refresh=True,
operations=[
{
"index": {
"_index": ".ds-my-data-stream-2099.03.08-000003",
"_id": "bfspvnIBr7VVZlfp2lqX",
"if_seq_no": 0,
"if_primary_term": 1
}
},
{
"@timestamp": "2099-03-08T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
],
)
print(resp)
response = client.bulk(
refresh: true,
body: [
{
index: {
_index: '.ds-my-data-stream-2099.03.08-000003',
_id: 'bfspvnIBr7VVZlfp2lqX',
if_seq_no: 0,
if_primary_term: 1
}
},
{
"@timestamp": '2099-03-08T11:06:07.000Z',
user: {
id: '8a4f500d'
},
message: 'Login successful'
}
]
)
puts response
const response = await client.bulk({
refresh: "true",
operations: [
{
index: {
_index: ".ds-my-data-stream-2099.03.08-000003",
_id: "bfspvnIBr7VVZlfp2lqX",
if_seq_no: 0,
if_primary_term: 1,
},
},
{
"@timestamp": "2099-03-08T11:06:07.000Z",
user: {
id: "8a4f500d",
},
message: "Login successful",
},
],
});
console.log(response);
PUT /_bulk?refresh
{ "index": { "_index": ".ds-my-data-stream-2099.03.08-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
{ "@timestamp": "2099-03-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }