[role="xpack"] [[use-a-data-stream]] == Use a data stream After you <>, you can do the following: * <> * <> * <> * <> * <> * <> * <> * <> //// [source,console] ---- PUT /_index_template/logs_data_stream { "index_patterns": [ "logs*" ], "data_stream": { } } PUT /_data_stream/logs POST /logs/_rollover/ POST /logs/_rollover/ PUT /logs/_create/bfspvnIBr7VVZlfp2lqX?refresh=wait_for { "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "yWIumJd7" }, "message": "Login successful" } PUT /_data_stream/logs_alt ---- // TESTSETUP [source,console] ---- DELETE /_data_stream/* DELETE /_index_template/* ---- // TEARDOWN //// [discrete] [[add-documents-to-a-data-stream]] === Add documents to a data stream You can add documents to a data stream using the following requests: * An <> request with an <> set to `create`. Specify the data stream's name in place of an index name. + -- NOTE: The `op_type` parameter defaults to `create` when adding new documents. .*Example: Index API request* [%collapsible] ==== The following index API request adds a new document to the `logs` data stream. [source,console] ---- POST /logs/_doc/ { "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } ---- ==== IMPORTANT: You cannot add new documents to a data stream using the index API's `PUT //_doc/<_id>` request format. To specify a document ID, use the `PUT //_create/<_id>` format instead. -- * A <> request using the `create` action. Specify the data stream's name in place of an index name. + -- NOTE: Data streams do not support other bulk actions, such as `index`. .*Example: Bulk API request* [%collapsible] ==== The following bulk API request adds several new documents to the `logs` data stream. Note that only the `create` action is used. [source,console] ---- PUT /logs/_bulk?refresh {"create":{ }} { "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" } {"create":{ }} { "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } {"create":{ }} { "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" } ---- ==== -- You can use an <> with these requests to pre-process data before it's indexed. .*Example: Ingest pipeline* [%collapsible] ==== The following <> request creates the `lowercase_message_field` ingest pipeline. The pipeline uses the <> to change the `message` field value to lowercase before indexing. [source,console] ---- PUT /_ingest/pipeline/lowercase_message_field { "description" : "Lowercases the message field value", "processors" : [ { "lowercase" : { "field" : "message" } } ] } ---- // TEST[continued] The following index API request adds a new document to the `logs` data stream. The request includes a `?pipeline=lowercase_message_field` query parameter. This parameter indicates {es} should use the `lowercase_message_field` pipeline to pre-process the document before indexing it. During pre-processing, the pipeline changes the letter case of the document's `message` field value from `LOGIN Successful` to `login successful`. [source,console] ---- POST /logs/_doc?pipeline=lowercase_message_field { "@timestamp": "2020-12-08T11:12:01.000Z", "user": { "id": "I1YBEOxJ" }, "message": "LOGIN Successful" } ---- // TEST[continued] //// [source,console] ---- DELETE /_ingest/pipeline/lowercase_message_field ---- // TEST[continued] //// ==== [discrete] [[search-a-data-stream]] === Search a data stream The following search APIs support data streams: * <> * <> * <> * <> //// * <> //// .*Example* [%collapsible] ==== The following <> request searches the `logs` data stream for documents with a timestamp between today and yesterday that also have `message` value of `login successful`. [source,console] ---- GET /logs/_search { "query": { "bool": { "must": { "range": { "@timestamp": { "gte": "now-1d/d", "lt": "now/d" } } }, "should": { "match": { "message": "login successful" } } } } } ---- ==== You can use a comma-separated list or wildcard (`*`) expression to search multiple data streams, indices, and index aliases in the same request. .*Example* [%collapsible] ==== The following request searches the `logs` and `logs_alt` data streams, which are specified as a comma-separated list in the request path. [source,console] ---- GET /logs,logs_alt/_search { "query": { "match": { "user.id": "8a4f500d" } } } ---- The following request uses the `logs*` wildcard expression to search any data stream, index, or index alias beginning with `logs`. [source,console] ---- GET /logs*/_search { "query": { "match": { "user.id": "vlb44hny" } } } ---- The following search request omits a target in the request path. The request searches all data streams and indices in the cluster. [source,console] ---- GET /_search { "query": { "match": { "user.id": "l7gk7f82" } } } ---- ==== [discrete] [[get-stats-for-a-data-stream]] === Get statistics for a data stream You can use the <> to retrieve statistics for one or more data streams. These statistics include: * A count of the stream's backing indices * The total store size of all shards for the stream's backing indices * The highest `@timestamp` value for the stream .*Example* [%collapsible] ==== The following data stream stats API request retrieves statistics for the `logs` data stream. [source,console] ---- GET /_data_stream/logs/_stats?human=true ---- The API returns the following response. [source,console-result] ---- { "_shards": { "total": 6, "successful": 3, "failed": 0 }, "data_stream_count": 1, "backing_indices": 3, "total_store_size": "624b", "total_store_size_bytes": 624, "data_streams": [ { "data_stream": "logs", "backing_indices": 3, "store_size": "624b", "store_size_bytes": 624, "maximum_timestamp": 1607339167000 } ] } ---- // TESTRESPONSE[s/"total_store_size": "624b"/"total_store_size": $body.total_store_size/] // TESTRESPONSE[s/"total_store_size_bytes": 624/"total_store_size_bytes": $body.total_store_size_bytes/] // TESTRESPONSE[s/"store_size": "624b"/"store_size": $body.data_streams.0.store_size/] // TESTRESPONSE[s/"store_size_bytes": 624/"store_size_bytes": $body.data_streams.0.store_size_bytes/] ==== [discrete] [[manually-roll-over-a-data-stream]] === Manually roll over a data stream A rollover creates a new backing index for a data stream. This new backing index becomes the stream's <> and increments the stream's <>. In most cases, we recommend using <> to automate rollovers for data streams. This lets you automatically roll over the current write index when it meets specified criteria, such as a maximum age or size. However, you can also use the <> to manually perform a rollover. This can be useful if you want to <> to the stream's write index after updating a data stream's template. .*Example* [%collapsible] ==== The following <> request submits a manual rollover request for the `logs` data stream. [source,console] ---- POST /logs/_rollover/ ---- ==== [discrete] [[open-closed-backing-indices]] === Open closed backing indices You may <> one or more of a data stream's backing indices as part of its {ilm-init} lifecycle or another workflow. A closed backing index cannot be searched, even for searches targeting its data stream. You also can't <> in a closed index. You can re-open individual backing indices by sending an <> directly to the index. You also can conveniently re-open all closed backing indices for a data stream by sending an open request directly to the stream. .*Example* [%collapsible] ==== The following <> API request retrieves the status for the `logs` data stream's backing indices. //// [source,console] ---- POST /.ds-logs-000001,.ds-logs-000002/_close/ ---- //// [source,console] ---- GET /_cat/indices/logs?v&s=index&h=index,status ---- // TEST[continued] The API returns the following response. The response indicates the `logs` data stream contains two closed backing indices: `.ds-logs-000001` and `.ds-logs-000002`. [source,txt] ---- index status .ds-logs-000001 close .ds-logs-000002 close .ds-logs-000003 open ---- // TESTRESPONSE[non_json] The following <> request re-opens any closed backing indices for the `logs` data stream, including `.ds-logs-000001` and `.ds-logs-000002`. [source,console] ---- POST /logs/_open/ ---- // TEST[continued] You can resubmit the original cat indices API request to verify the `.ds-logs-000001` and `.ds-logs-000002` backing indices were re-opened. [source,console] ---- GET /_cat/indices/logs?v&s=index&h=index,status ---- // TEST[continued] The API returns the following response. [source,txt] ---- index status .ds-logs-000001 open .ds-logs-000002 open .ds-logs-000003 open ---- // TESTRESPONSE[non_json] ==== [discrete] [[reindex-with-a-data-stream]] === Reindex with a data stream You can use the <> to copy documents to a data stream from an existing index, index alias, or data stream. A reindex copies documents from a _source_ to a _destination_. The source and destination can be any pre-existing index, index alias, or data stream. However, the source and destination must be different. You cannot reindex a data stream into itself. Because data streams are <>, a reindex request to a data stream destination must have an `op_type` of `create`. This means a reindex can only add new documents to a data stream. It cannot update existing documents in the data stream destination. A reindex can be used to: * Convert an existing index alias and collection of time-based indices into a data stream. * Apply a new or updated <> by reindexing an existing data stream into a new one. This applies mapping and setting changes in the template to each document and backing index of the data stream destination. See <>. TIP: If you only want to update the mappings or settings of a data stream's write index, we recommend you update the <> and perform a <>. .*Example* [%collapsible] ==== The following reindex request copies documents from the `archive` index alias to the existing `logs` data stream. Because the destination is a data stream, the request's `op_type` is `create`. //// [source,console] ---- PUT /_bulk?refresh=wait_for {"create":{"_index" : "archive_1"}} { "@timestamp": "2020-12-08T11:04:05.000Z" } {"create":{"_index" : "archive_2"}} { "@timestamp": "2020-12-08T11:06:07.000Z" } {"create":{"_index" : "archive_2"}} { "@timestamp": "2020-12-09T11:07:08.000Z" } {"create":{"_index" : "archive_2"}} { "@timestamp": "2020-12-09T11:07:08.000Z" } POST /_aliases { "actions" : [ { "add" : { "index" : "archive_1", "alias" : "archive" } }, { "add" : { "index" : "archive_2", "alias" : "archive", "is_write_index" : true} } ] } ---- //// [source,console] ---- POST /_reindex { "source": { "index": "archive" }, "dest": { "index": "logs", "op_type": "create" } } ---- // TEST[continued] ==== You can also reindex documents from a data stream to an index, index alias, or data stream. .*Example* [%collapsible] ==== The following reindex request copies documents from the `logs` data stream to the existing `archive` index alias. Because the destination is not a data stream, the `op_type` does not need to be specified. [source,console] ---- POST /_reindex { "source": { "index": "logs" }, "dest": { "index": "archive" } } ---- // TEST[continued] ==== [discrete] [[update-delete-docs-in-a-data-stream]] === Update or delete documents in a data stream You can update or delete documents in a data stream using the following requests: * An <> request + .*Example* [%collapsible] ==== The following update by query API request updates documents in the `logs` data stream with a `user.id` of `l7gk7f82`. The request uses a <> to assign matching documents a new `user.id` value of `XgdX0NoX`. [source,console] ---- POST /logs/_update_by_query { "query": { "match": { "user.id": "l7gk7f82" } }, "script": { "source": "ctx._source.user.id = params.new_id", "params": { "new_id": "XgdX0NoX" } } } ---- ==== * A <> request + .*Example* [%collapsible] ==== The following delete by query API request deletes documents in the `logs` data stream with a `user.id` of `vlb44hny`. [source,console] ---- POST /logs/_delete_by_query { "query": { "match": { "user.id": "vlb44hny" } } } ---- ==== [discrete] [[update-delete-docs-in-a-backing-index]] === Update or delete documents in a backing index Alternatively, you can update or delete documents in a data stream by sending the update or deletion request to the backing index containing the document. To do this, you first need to get: * The <> * The name of the backing index that contains the document If you want to update a document, you must also get its current <>. You can use a <> to retrieve this information. .*Example* [%collapsible] ==== The following search request retrieves documents in the `logs` data stream with a `user.id` of `yWIumJd7`. By default, this search returns the document ID and backing index for any matching documents. The request includes a `"seq_no_primary_term": true` argument. This means the search also returns the sequence number and primary term for any matching documents. [source,console] ---- GET /logs/_search { "seq_no_primary_term": true, "query": { "match": { "user.id": "yWIumJd7" } } } ---- The API returns the following response. The `hits.hits` property contains information for any documents matching the search. [source,console-result] ---- { "took": 20, "timed_out": false, "_shards": { "total": 3, "successful": 3, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 0.2876821, "hits": [ { "_index": ".ds-logs-000003", <1> "_type": "_doc", "_id": "bfspvnIBr7VVZlfp2lqX", <2> "_seq_no": 0, <3> "_primary_term": 1, <4> "_score": 0.2876821, "_source": { "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "yWIumJd7" }, "message": "Login successful" } } ] } } ---- // TESTRESPONSE[s/"took": 20/"took": $body.took/] // TESTRESPONSE[s/"max_score": 0.2876821/"max_score": $body.hits.max_score/] // TESTRESPONSE[s/"_score": 0.2876821/"_score": $body.hits.hits.0._score/] <1> Backing index containing the matching document <2> Document ID for the document <3> Current sequence number for the document <4> Primary term for the document ==== You can use an <> request to update an individual document. To prevent an accidental overwrite, this request must include valid `if_seq_no` and `if_primary_term` arguments. .*Example* [%collapsible] ==== The following index API request updates an existing document in the `logs` data stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the `.ds-logs-000003` backing index. The request also includes the current sequence number and primary term in the respective `if_seq_no` and `if_primary_term` query parameters. The request body contains a new JSON source for the document. [source,console] ---- PUT /.ds-logs-000003/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=0&if_primary_term=1 { "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } ---- ==== You use the <> to delete individual documents. Deletion requests do not require a sequence number or primary term. .*Example* [%collapsible] ==== The following index API request deletes an existing document in the `logs` data stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the `.ds-logs-000003` backing index. [source,console] ---- DELETE /.ds-logs-000003/_doc/bfspvnIBr7VVZlfp2lqX ---- ==== You can use the <> to delete or update multiple documents in one request using `delete`, `index`, or `update` actions. If the action type is `index`, the action must include valid <> arguments. .*Example* [%collapsible] ==== The following bulk API request uses an `index` action to update an existing document in the `logs` data stream. The `index` action targets document ID `bfspvnIBr7VVZlfp2lqX` in the `.ds-logs-000003` backing index. The action also includes the current sequence number and primary term in the respective `if_seq_no` and `if_primary_term` parameters. [source,console] ---- PUT /_bulk?refresh { "index": { "_index": ".ds-logs-000003", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 0, "if_primary_term": 1 } } { "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" } ---- ====