OpenSearch/docs/reference/data-streams/use-a-data-stream.asciidoc

754 lines
18 KiB
Plaintext

[role="xpack"]
[[use-a-data-stream]]
== Use a data stream
After you <<set-up-a-data-stream,set up a data stream>>, you can do
the following:
* <<add-documents-to-a-data-stream>>
* <<search-a-data-stream>>
* <<get-stats-for-a-data-stream>>
* <<manually-roll-over-a-data-stream>>
* <<open-closed-backing-indices>>
* <<reindex-with-a-data-stream>>
* <<update-delete-docs-in-a-data-stream>>
* <<update-delete-docs-in-a-backing-index>>
////
[source,console]
----
PUT /_index_template/logs_data_stream
{
"index_patterns": [ "logs*" ],
"data_stream": { }
}
PUT /_data_stream/logs
POST /logs/_rollover/
POST /logs/_rollover/
PUT /logs/_create/bfspvnIBr7VVZlfp2lqX?refresh=wait_for
{
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "yWIumJd7"
},
"message": "Login successful"
}
PUT /_data_stream/logs_alt
----
// TESTSETUP
[source,console]
----
DELETE /_data_stream/*
DELETE /_index_template/*
----
// TEARDOWN
////
[discrete]
[[add-documents-to-a-data-stream]]
=== Add documents to a data stream
You can add documents to a data stream using the following requests:
* An <<docs-index_,index API>> request with an
<<docs-index-api-op_type,`op_type`>> set to `create`. Specify the data
stream's name in place of an index name.
+
--
NOTE: The `op_type` parameter defaults to `create` when adding new documents.
.*Example: Index API request*
[%collapsible]
====
The following index API request adds a new document to the `logs` data
stream.
[source,console]
----
POST /logs/_doc/
{
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
----
====
IMPORTANT: You cannot add new documents to a data stream using the index API's
`PUT /<target>/_doc/<_id>` request format. To specify a document ID, use the
`PUT /<target>/_create/<_id>` format instead.
--
* A <<docs-bulk,bulk API>> request using the `create` action. Specify the data
stream's name in place of an index name.
+
--
NOTE: Data streams do not support other bulk actions, such as `index`.
.*Example: Bulk API request*
[%collapsible]
====
The following bulk API request adds several new documents to
the `logs` data stream. Note that only the `create` action is used.
[source,console]
----
PUT /logs/_bulk?refresh
{"create":{ }}
{ "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{ }}
{ "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{ }}
{ "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
----
====
--
You can use an <<ingest,ingest pipeline>> with these requests to pre-process
data before it's indexed.
.*Example: Ingest pipeline*
[%collapsible]
====
The following <<put-pipeline-api,put pipeline API>> request creates the
`lowercase_message_field` ingest pipeline. The pipeline uses the
<<lowercase-processor,`lowercase` ingest processor>> to change the `message`
field value to lowercase before indexing.
[source,console]
----
PUT /_ingest/pipeline/lowercase_message_field
{
"description" : "Lowercases the message field value",
"processors" : [
{
"lowercase" : {
"field" : "message"
}
}
]
}
----
// TEST[continued]
The following index API request adds a new document to the `logs` data stream.
The request includes a `?pipeline=lowercase_message_field` query parameter.
This parameter indicates {es} should use the `lowercase_message_field` pipeline
to pre-process the document before indexing it.
During pre-processing, the pipeline changes the letter case of the document's
`message` field value from `LOGIN Successful` to `login successful`.
[source,console]
----
POST /logs/_doc?pipeline=lowercase_message_field
{
"@timestamp": "2020-12-08T11:12:01.000Z",
"user": {
"id": "I1YBEOxJ"
},
"message": "LOGIN Successful"
}
----
// TEST[continued]
////
[source,console]
----
DELETE /_ingest/pipeline/lowercase_message_field
----
// TEST[continued]
////
====
[discrete]
[[search-a-data-stream]]
=== Search a data stream
The following search APIs support data streams:
* <<search-search, Search>>
* <<async-search, Async search>>
* <<search-multi-search, Multi search>>
* <<search-field-caps, Field capabilities>>
////
* <<eql-search-api, EQL search>>
////
.*Example*
[%collapsible]
====
The following <<search-search,search API>> request searches the `logs` data
stream for documents with a timestamp between today and yesterday that also have
`message` value of `login successful`.
[source,console]
----
GET /logs/_search
{
"query": {
"bool": {
"must": {
"range": {
"@timestamp": {
"gte": "now-1d/d",
"lt": "now/d"
}
}
},
"should": {
"match": {
"message": "login successful"
}
}
}
}
}
----
====
You can use a comma-separated list or wildcard (`*`) expression to search
multiple data streams, indices, and index aliases in the same request.
.*Example*
[%collapsible]
====
The following request searches the `logs` and `logs_alt` data streams, which are
specified as a comma-separated list in the request path.
[source,console]
----
GET /logs,logs_alt/_search
{
"query": {
"match": {
"user.id": "8a4f500d"
}
}
}
----
The following request uses the `logs*` wildcard expression to search any data
stream, index, or index alias beginning with `logs`.
[source,console]
----
GET /logs*/_search
{
"query": {
"match": {
"user.id": "vlb44hny"
}
}
}
----
The following search request omits a target in the request path. The request
searches all data streams and indices in the cluster.
[source,console]
----
GET /_search
{
"query": {
"match": {
"user.id": "l7gk7f82"
}
}
}
----
====
[discrete]
[[get-stats-for-a-data-stream]]
=== Get statistics for a data stream
You can use the <<data-stream-stats-api,data stream stats API>> to retrieve
statistics for one or more data streams. These statistics include:
* A count of the stream's backing indices
* The total store size of all shards for the stream's backing indices
* The highest `@timestamp` value for the stream
.*Example*
[%collapsible]
====
The following data stream stats API request retrieves statistics for the
`logs` data stream.
[source,console]
----
GET /_data_stream/logs/_stats?human=true
----
The API returns the following response.
[source,console-result]
----
{
"_shards": {
"total": 6,
"successful": 3,
"failed": 0
},
"data_stream_count": 1,
"backing_indices": 3,
"total_store_size": "624b",
"total_store_size_bytes": 624,
"data_streams": [
{
"data_stream": "logs",
"backing_indices": 3,
"store_size": "624b",
"store_size_bytes": 624,
"maximum_timestamp": 1607339167000
}
]
}
----
// TESTRESPONSE[s/"total_store_size": "624b"/"total_store_size": $body.total_store_size/]
// TESTRESPONSE[s/"total_store_size_bytes": 624/"total_store_size_bytes": $body.total_store_size_bytes/]
// TESTRESPONSE[s/"store_size": "624b"/"store_size": $body.data_streams.0.store_size/]
// TESTRESPONSE[s/"store_size_bytes": 624/"store_size_bytes": $body.data_streams.0.store_size_bytes/]
====
[discrete]
[[manually-roll-over-a-data-stream]]
=== Manually roll over a data stream
A rollover creates a new backing index for a data stream. This new backing index
becomes the stream's <<data-stream-write-index,write index>> and increments
the stream's <<data-streams-generation,generation>>.
In most cases, we recommend using <<index-lifecycle-management,{ilm-init}>> to
automate rollovers for data streams. This lets you automatically roll over the
current write index when it meets specified criteria, such as a maximum age or
size.
However, you can also use the <<indices-rollover-index,rollover API>> to
manually perform a rollover. This can be useful if you want to
<<data-streams-change-mappings-and-settings,apply mapping or setting changes>>
to the stream's write index after updating a data stream's template.
.*Example*
[%collapsible]
====
The following <<indices-rollover-index,rollover API>> request submits a manual
rollover request for the `logs` data stream.
[source,console]
----
POST /logs/_rollover/
----
====
[discrete]
[[open-closed-backing-indices]]
=== Open closed backing indices
You may <<indices-close,close>> one or more of a data stream's backing indices
as part of its {ilm-init} lifecycle or another workflow. A closed backing index
cannot be searched, even for searches targeting its data stream. You also can't
<<update-delete-docs-in-a-data-stream,update or delete documents>> in a closed
index.
You can re-open individual backing indices by sending an
<<indices-open-close,open request>> directly to the index.
You also can conveniently re-open all closed backing indices for a data stream
by sending an open request directly to the stream.
.*Example*
[%collapsible]
====
The following <<cat-indices,cat indices>> API request retrieves the status for
the `logs` data stream's backing indices.
////
[source,console]
----
POST /.ds-logs-000001,.ds-logs-000002/_close/
----
////
[source,console]
----
GET /_cat/indices/logs?v&s=index&h=index,status
----
// TEST[continued]
The API returns the following response. The response indicates the `logs` data
stream contains two closed backing indices: `.ds-logs-000001` and
`.ds-logs-000002`.
[source,txt]
----
index status
.ds-logs-000001 close
.ds-logs-000002 close
.ds-logs-000003 open
----
// TESTRESPONSE[non_json]
The following <<indices-open-close,open API>> request re-opens any closed
backing indices for the `logs` data stream, including `.ds-logs-000001` and
`.ds-logs-000002`.
[source,console]
----
POST /logs/_open/
----
// TEST[continued]
You can resubmit the original cat indices API request to verify the
`.ds-logs-000001` and `.ds-logs-000002` backing indices were re-opened.
[source,console]
----
GET /_cat/indices/logs?v&s=index&h=index,status
----
// TEST[continued]
The API returns the following response.
[source,txt]
----
index status
.ds-logs-000001 open
.ds-logs-000002 open
.ds-logs-000003 open
----
// TESTRESPONSE[non_json]
====
[discrete]
[[reindex-with-a-data-stream]]
=== Reindex with a data stream
You can use the <<docs-reindex,reindex API>> to copy documents to a data stream
from an existing index, index alias, or data stream.
A reindex copies documents from a _source_ to a _destination_. The source and
destination can be any pre-existing index, index alias, or data stream. However,
the source and destination must be different. You cannot reindex a data stream
into itself.
Because data streams are <<data-streams-append-only,append-only>>, a reindex
request to a data stream destination must have an `op_type` of `create`. This
means a reindex can only add new documents to a data stream. It cannot update
existing documents in the data stream destination.
A reindex can be used to:
* Convert an existing index alias and collection of time-based indices into a
data stream.
* Apply a new or updated <<create-a-data-stream-template,index template>>
by reindexing an existing data stream into a new one. This applies mapping
and setting changes in the template to each document and backing index of the
data stream destination. See
<<data-streams-use-reindex-to-change-mappings-settings>>.
TIP: If you only want to update the mappings or settings of a data stream's
write index, we recommend you update the <<create-a-data-stream-template,data
stream's template>> and perform a <<manually-roll-over-a-data-stream,rollover>>.
.*Example*
[%collapsible]
====
The following reindex request copies documents from the `archive` index alias to
the existing `logs` data stream. Because the destination is a data stream, the
request's `op_type` is `create`.
////
[source,console]
----
PUT /_bulk?refresh=wait_for
{"create":{"_index" : "archive_1"}}
{ "@timestamp": "2020-12-08T11:04:05.000Z" }
{"create":{"_index" : "archive_2"}}
{ "@timestamp": "2020-12-08T11:06:07.000Z" }
{"create":{"_index" : "archive_2"}}
{ "@timestamp": "2020-12-09T11:07:08.000Z" }
{"create":{"_index" : "archive_2"}}
{ "@timestamp": "2020-12-09T11:07:08.000Z" }
POST /_aliases
{
"actions" : [
{ "add" : { "index" : "archive_1", "alias" : "archive" } },
{ "add" : { "index" : "archive_2", "alias" : "archive", "is_write_index" : true} }
]
}
----
////
[source,console]
----
POST /_reindex
{
"source": {
"index": "archive"
},
"dest": {
"index": "logs",
"op_type": "create"
}
}
----
// TEST[continued]
====
You can also reindex documents from a data stream to an index, index
alias, or data stream.
.*Example*
[%collapsible]
====
The following reindex request copies documents from the `logs` data stream
to the existing `archive` index alias. Because the destination is not a data
stream, the `op_type` does not need to be specified.
[source,console]
----
POST /_reindex
{
"source": {
"index": "logs"
},
"dest": {
"index": "archive"
}
}
----
// TEST[continued]
====
[discrete]
[[update-delete-docs-in-a-data-stream]]
=== Update or delete documents in a data stream
You can update or delete documents in a data stream using the following
requests:
* An <<docs-update-by-query,update by query API>> request
+
.*Example*
[%collapsible]
====
The following update by query API request updates documents in the `logs` data
stream with a `user.id` of `l7gk7f82`. The request uses a
<<modules-scripting-using,script>> to assign matching documents a new `user.id`
value of `XgdX0NoX`.
[source,console]
----
POST /logs/_update_by_query
{
"query": {
"match": {
"user.id": "l7gk7f82"
}
},
"script": {
"source": "ctx._source.user.id = params.new_id",
"params": {
"new_id": "XgdX0NoX"
}
}
}
----
====
* A <<docs-delete-by-query,delete by query API>> request
+
.*Example*
[%collapsible]
====
The following delete by query API request deletes documents in the `logs` data
stream with a `user.id` of `vlb44hny`.
[source,console]
----
POST /logs/_delete_by_query
{
"query": {
"match": {
"user.id": "vlb44hny"
}
}
}
----
====
[discrete]
[[update-delete-docs-in-a-backing-index]]
=== Update or delete documents in a backing index
Alternatively, you can update or delete documents in a data stream by sending
the update or deletion request to the backing index containing the document. To
do this, you first need to get:
* The <<mapping-id-field,document ID>>
* The name of the backing index that contains the document
If you want to update a document, you must also get its current
<<optimistic-concurrency-control,sequence number and primary term>>.
You can use a <<search-a-data-stream,search request>> to retrieve this
information.
.*Example*
[%collapsible]
====
The following search request retrieves documents in the `logs` data stream with
a `user.id` of `yWIumJd7`. By default, this search returns the document ID and
backing index for any matching documents.
The request includes a `"seq_no_primary_term": true` argument. This means the
search also returns the sequence number and primary term for any matching
documents.
[source,console]
----
GET /logs/_search
{
"seq_no_primary_term": true,
"query": {
"match": {
"user.id": "yWIumJd7"
}
}
}
----
The API returns the following response. The `hits.hits` property contains
information for any documents matching the search.
[source,console-result]
----
{
"took": 20,
"timed_out": false,
"_shards": {
"total": 3,
"successful": 3,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 0.2876821,
"hits": [
{
"_index": ".ds-logs-000003", <1>
"_type": "_doc",
"_id": "bfspvnIBr7VVZlfp2lqX", <2>
"_seq_no": 0, <3>
"_primary_term": 1, <4>
"_score": 0.2876821,
"_source": {
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "yWIumJd7"
},
"message": "Login successful"
}
}
]
}
}
----
// TESTRESPONSE[s/"took": 20/"took": $body.took/]
// TESTRESPONSE[s/"max_score": 0.2876821/"max_score": $body.hits.max_score/]
// TESTRESPONSE[s/"_score": 0.2876821/"_score": $body.hits.hits.0._score/]
<1> Backing index containing the matching document
<2> Document ID for the document
<3> Current sequence number for the document
<4> Primary term for the document
====
You can use an <<docs-index_,index API>> request to update an individual
document. To prevent an accidental overwrite, this request must include valid
`if_seq_no` and `if_primary_term` arguments.
.*Example*
[%collapsible]
====
The following index API request updates an existing document in the `logs` data
stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the
`.ds-logs-000003` backing index.
The request also includes the current sequence number and primary term in the
respective `if_seq_no` and `if_primary_term` query parameters. The request body
contains a new JSON source for the document.
[source,console]
----
PUT /.ds-logs-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1
{
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
----
====
You use the <<docs-delete,delete API>> to delete individual documents. Deletion
requests do not require a sequence number or primary term.
.*Example*
[%collapsible]
====
The following index API request deletes an existing document in the `logs` data
stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the
`.ds-logs-000003` backing index.
[source,console]
----
DELETE /.ds-logs-000003/_doc/bfspvnIBr7VVZlfp2lqX
----
====
You can use the <<docs-bulk,bulk API>> to delete or update multiple documents in
one request using `delete`, `index`, or `update` actions.
If the action type is `index`, the action must include valid
<<bulk-optimistic-concurrency-control,`if_seq_no` and `if_primary_term`>>
arguments.
.*Example*
[%collapsible]
====
The following bulk API request uses an `index` action to update an existing
document in the `logs` data stream.
The `index` action targets document ID `bfspvnIBr7VVZlfp2lqX` in the
`.ds-logs-000003` backing index. The action also includes the current sequence
number and primary term in the respective `if_seq_no` and `if_primary_term`
parameters.
[source,console]
----
PUT /_bulk?refresh
{ "index": { "_index": ".ds-logs-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } }
{ "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
----
====