OpenSearch/docs/reference/data-streams/use-a-data-stream.asciidoc

553 lines
13 KiB
Plaintext

[[use-a-data-stream]]
== Use a data stream
After you <<set-up-a-data-stream,set up a data stream>>, you can do
the following:
* <<add-documents-to-a-data-stream>>
* <<search-a-data-stream>>
* <<manually-roll-over-a-data-stream>>
* <<reindex-with-a-data-stream>>
* <<update-delete-docs-in-a-data-stream>>
////
[source,console]
----
PUT /_index_template/logs_data_stream
{
"index_patterns": [ "logs*" ],
"data_stream": {
"timestamp_field": "@timestamp"
},
"template": {
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
}
}
}
}
}
PUT /_data_stream/logs
----
////
[discrete]
[[add-documents-to-a-data-stream]]
=== Add documents to a data stream
You can add documents to a data stream using the following requests:
* An <<docs-index_,index API>> request with an
<<docs-index-api-op_type,`op_type`>> set to `create`. Specify the data
stream's name in place of an index name.
+
--
NOTE: The `op_type` parameter defaults to `create` when adding new documents.
.*Example: Index API request*
[%collapsible]
====
The following index API request adds a new document to the `logs` data
stream.
[source,console]
----
POST /logs/_doc/
{
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
----
// TEST[continued]
====
--
* A <<docs-bulk,bulk API>> request using the `create` action. Specify the data
stream's name in place of an index name.
+
--
NOTE: Data streams do not support other bulk actions, such as `index`.
.*Example: Bulk API request*
[%collapsible]
====
The following bulk API request adds several new documents to
the `logs` data stream. Note that only the `create` action is used.
[source,console]
----
PUT /logs/_bulk?refresh
{"create":{"_index" : "logs"}}
{ "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
{"create":{"_index" : "logs"}}
{ "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
{"create":{"_index" : "logs"}}
{ "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
----
// TEST[continued]
====
--
[discrete]
[[search-a-data-stream]]
=== Search a data stream
The following search APIs support data streams:
* <<search-search, Search>>
* <<async-search, Async search>>
* <<search-multi-search, Multi search>>
* <<search-field-caps, Field capabilities>>
////
* <<eql-search-api, EQL search>>
////
.*Example*
[%collapsible]
====
The following <<search-search,search API>> request searches the `logs` data
stream for documents with a timestamp between today and yesterday that also have
`message` value of `login successful`.
[source,console]
----
GET /logs/_search
{
"query": {
"bool": {
"must": {
"range": {
"@timestamp": {
"gte": "now-1d/d",
"lt": "now/d"
}
}
},
"should": {
"match": {
"message": "login successful"
}
}
}
}
}
----
// TEST[continued]
====
You can use a comma-separated list or wildcard (`*`) expression to search
multiple data streams, indices, and index aliases in the same request.
.*Example*
[%collapsible]
====
////
[source,console]
----
PUT /_data_stream/logs_alt
----
// TEST[continued]
////
The following request searches the `logs` and `logs_alt` data streams, which are
specified as a comma-separated list in the request path.
[source,console]
----
GET /logs,logs_alt/_search
{
"query": {
"match": {
"user.id": "8a4f500d"
}
}
}
----
// TEST[continued]
The following request uses the `logs*` wildcard expression to search any data
stream, index, or index alias beginning with `logs`.
[source,console]
----
GET /logs*/_search
{
"query": {
"match": {
"user.id": "vlb44hny"
}
}
}
----
// TEST[continued]
The following search request omits a target in the request path. The request
searches all data streams and indices in the cluster.
[source,console]
----
GET /_search
{
"query": {
"match": {
"user.id": "l7gk7f82"
}
}
}
----
// TEST[continued]
====
[discrete]
[[manually-roll-over-a-data-stream]]
=== Manually roll over a data stream
A rollover creates a new backing index for a data stream. This new backing index
becomes the stream's <<data-stream-write-index,write index>> and increments
the stream's <<data-streams-generation,generation>>.
In most cases, we recommend using <<index-lifecycle-management,{ilm-init}>> to
automate rollovers for data streams. This lets you automatically roll over the
current write index when it meets specified criteria, such as a maximum age or
size.
However, you can also use the <<indices-rollover-index,rollover API>> to
manually perform a rollover. This can be useful if you want to
<<data-streams-change-mappings-and-settings,apply mapping or setting changes>>
to the stream's write index after updating a data stream's template.
.*Example*
[%collapsible]
====
The following <<indices-rollover-index,rollover API>> request submits a manual
rollover request for the `logs` data stream.
[source,console]
----
POST /logs/_rollover/
{
"conditions": {
"max_docs": "1"
}
}
----
// TEST[continued]
====
[discrete]
[[reindex-with-a-data-stream]]
=== Reindex with a data stream
You can use the <<docs-reindex,reindex API>> to copy documents to a data stream
from an existing index, index alias, or data stream.
A reindex copies documents from a _source_ to a _destination_. The source and
destination can be any pre-existing index, index alias, or data stream. However,
the source and destination must be different. You cannot reindex a data stream
into itself.
Because data streams are <<data-streams-append-only,append-only>>, a reindex
request to a data stream destination must have an `op_type` of `create`. This
means a reindex can only add new documents to a data stream. It cannot update
existing documents in the data stream destination.
A reindex can be used to:
* Convert an existing index alias and collection of time-based indices into a
data stream.
* Apply a new or updated <<create-a-data-stream-template,composable template>>
by reindexing an existing data stream into a new one. This applies mapping
and setting changes in the template to each document and backing index of the
data stream destination. See
<<data-streams-use-reindex-to-change-mappings-settings>>.
TIP: If you only want to update the mappings or settings of a data stream's
write index, we recommend you update the <<create-a-data-stream-template,data
stream's template>> and perform a <<manually-roll-over-a-data-stream,rollover>>.
.*Example*
[%collapsible]
====
The following reindex request copies documents from the `archive` index alias to
the existing `logs` data stream. Because the destination is a data stream, the
request's `op_type` is `create`.
////
[source,console]
----
PUT /_bulk?refresh=wait_for
{"create":{"_index" : "archive_1"}}
{ "@timestamp": "2020-12-08T11:04:05.000Z" }
{"create":{"_index" : "archive_2"}}
{ "@timestamp": "2020-12-08T11:06:07.000Z" }
{"create":{"_index" : "archive_2"}}
{ "@timestamp": "2020-12-09T11:07:08.000Z" }
{"create":{"_index" : "archive_2"}}
{ "@timestamp": "2020-12-09T11:07:08.000Z" }
POST /_aliases
{
"actions" : [
{ "add" : { "index" : "archive_1", "alias" : "archive" } },
{ "add" : { "index" : "archive_2", "alias" : "archive", "is_write_index" : true} }
]
}
----
// TEST[continued]
////
[source,console]
----
POST /_reindex
{
"source": {
"index": "archive"
},
"dest": {
"index": "logs",
"op_type": "create"
}
}
----
// TEST[continued]
====
You can also reindex documents from a data stream to an index, index
alias, or data stream.
.*Example*
[%collapsible]
====
The following reindex request copies documents from the `logs` data stream
to the existing `archive` index alias. Because the destination is not a data
stream, the `op_type` does not need to be specified.
[source,console]
----
POST /_reindex
{
"source": {
"index": "logs"
},
"dest": {
"index": "archive"
}
}
----
// TEST[continued]
====
[discrete]
[[update-delete-docs-in-a-data-stream]]
=== Update or delete documents in a data stream
Data streams are designed to be <<data-streams-append-only,append-only>>. This
means you cannot send update or deletion requests for existing documents to a
data stream. However, you can send update or deletion requests to the backing
index containing the document.
To delete or update a document in a data stream, you first need to get:
* The <<mapping-id-field,document ID>>
* The name of the backing index that contains the document
If you want to update a document, you must also get its current
<<optimistic-concurrency-control,sequence number and primary term>>.
You can use a <<search-a-data-stream,search request>> to retrieve this
information.
.*Example*
[%collapsible]
====
////
[source,console]
----
PUT /logs/_create/bfspvnIBr7VVZlfp2lqX?refresh=wait_for
{
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "yWIumJd7"
},
"message": "Login successful"
}
----
// TEST[continued]
////
The following search request retrieves documents in the `logs` data stream with
a `user.id` of `yWIumJd7`. By default, this search returns the document ID and
backing index for any matching documents.
The request includes a `"seq_no_primary_term": true` argument. This means the
search also returns the sequence number and primary term for any matching
documents.
[source,console]
----
GET /logs/_search
{
"seq_no_primary_term": true,
"query": {
"match": {
"user.id": "yWIumJd7"
}
}
}
----
// TEST[continued]
The API returns the following response. The `hits.hits` property contains
information for any documents matching the search.
[source,console-result]
----
{
"took": 20,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 0.2876821,
"hits": [
{
"_index": ".ds-logs-000002", <1>
"_type": "_doc",
"_id": "bfspvnIBr7VVZlfp2lqX", <2>
"_seq_no": 4, <3>
"_primary_term": 1, <4>
"_score": 0.2876821,
"_source": {
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "yWIumJd7"
},
"message": "Login successful"
}
}
]
}
}
----
// TESTRESPONSE[s/"took": 20/"took": $body.took/]
<1> Backing index containing the matching document
<2> Document ID for the document
<3> Current sequence number for the document
<4> Primary term for the document
====
You can use an <<docs-index_,index API>> request to update an individual
document. To prevent an accidental overwrite, this request must include valid
`if_seq_no` and `if_primary_term` arguments.
.*Example*
[%collapsible]
====
The following index API request updates an existing document in the `logs` data
stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the
`.ds-logs-000002` backing index.
The request also includes the current sequence number and primary term in the
respective `if_seq_no` and `if_primary_term` query parameters. The request body
contains a new JSON source for the document.
[source,console]
----
PUT /.ds-logs-000002/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=4&if_primary_term=1
{
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "8a4f500d"
},
"message": "Login successful"
}
----
// TEST[continued]
====
You use the <<docs-delete,delete API>> to delete individual documents. Deletion
requests do not require a sequence number or primary term.
.*Example*
[%collapsible]
====
The following index API request deletes an existing document in the `logs` data
stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the
`.ds-logs-000002` backing index.
[source,console]
----
DELETE /.ds-logs-000002/_doc/bfspvnIBr7VVZlfp2lqX
----
// TEST[continued]
====
You can use the <<docs-bulk,bulk API>> to delete or update multiple documents in
one request using `delete`, `index`, or `update` actions.
If the action type is `index`, the action must include valid
<<bulk-optimistic-concurrency-control,`if_seq_no` and `if_primary_term`>>
arguments.
.*Example*
[%collapsible]
====
////
[source,console]
----
PUT /logs/_create/bfspvnIBr7VVZlfp2lqX?refresh=wait_for
{
"@timestamp": "2020-12-07T11:06:07.000Z",
"user": {
"id": "yWIumJd7"
},
"message": "Login successful"
}
----
// TEST[continued]
////
The following bulk API request uses an `index` action to update an existing
document in the `logs` data stream.
The `index` action targets document ID `bfspvnIBr7VVZlfp2lqX` in the
`.ds-logs-000002` backing index. The action also includes the current sequence
number and primary term in the respective `if_seq_no` and `if_primary_term`
parameters.
[source,console]
----
PUT /_bulk?refresh
{ "index": { "_index": ".ds-logs-000002", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 4, "if_primary_term": 1 } }
{ "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
----
// TEST[continued]
====
////
[source,console]
----
DELETE /_data_stream/logs
DELETE /_data_stream/logs_alt
DELETE /_index_template/logs_data_stream
----
// TEST[continued]
////