644 lines
15 KiB
Plaintext
644 lines
15 KiB
Plaintext
[[use-a-data-stream]]
|
|
== Use a data stream
|
|
|
|
After you <<set-up-a-data-stream,set up a data stream>>, you can do
|
|
the following:
|
|
|
|
* <<add-documents-to-a-data-stream>>
|
|
* <<search-a-data-stream>>
|
|
* <<manually-roll-over-a-data-stream>>
|
|
* <<reindex-with-a-data-stream>>
|
|
* <<update-delete-docs-in-a-data-stream>>
|
|
* <<update-delete-docs-in-a-backing-index>>
|
|
|
|
////
|
|
[source,console]
|
|
----
|
|
PUT /_index_template/logs_data_stream
|
|
{
|
|
"index_patterns": [ "logs*" ],
|
|
"data_stream": {
|
|
"timestamp_field": "@timestamp"
|
|
},
|
|
"template": {
|
|
"mappings": {
|
|
"properties": {
|
|
"@timestamp": {
|
|
"type": "date"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
PUT /_data_stream/logs
|
|
----
|
|
////
|
|
|
|
[discrete]
|
|
[[add-documents-to-a-data-stream]]
|
|
=== Add documents to a data stream
|
|
|
|
You can add documents to a data stream using the following requests:
|
|
|
|
* An <<docs-index_,index API>> request with an
|
|
<<docs-index-api-op_type,`op_type`>> set to `create`. Specify the data
|
|
stream's name in place of an index name.
|
|
+
|
|
--
|
|
NOTE: The `op_type` parameter defaults to `create` when adding new documents.
|
|
|
|
.*Example: Index API request*
|
|
[%collapsible]
|
|
====
|
|
The following index API request adds a new document to the `logs` data
|
|
stream.
|
|
|
|
[source,console]
|
|
----
|
|
POST /logs/_doc/
|
|
{
|
|
"@timestamp": "2020-12-07T11:06:07.000Z",
|
|
"user": {
|
|
"id": "8a4f500d"
|
|
},
|
|
"message": "Login successful"
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
IMPORTANT: You cannot add new documents to a data stream using the index API's
|
|
`PUT /<target>/_doc/<_id>` request format. To specify a document ID, use the
|
|
`PUT /<target>/_create/<_id>` format instead.
|
|
--
|
|
|
|
* A <<docs-bulk,bulk API>> request using the `create` action. Specify the data
|
|
stream's name in place of an index name.
|
|
+
|
|
--
|
|
NOTE: Data streams do not support other bulk actions, such as `index`.
|
|
|
|
.*Example: Bulk API request*
|
|
[%collapsible]
|
|
====
|
|
The following bulk API request adds several new documents to
|
|
the `logs` data stream. Note that only the `create` action is used.
|
|
|
|
[source,console]
|
|
----
|
|
PUT /logs/_bulk?refresh
|
|
{"create":{"_index" : "logs"}}
|
|
{ "@timestamp": "2020-12-08T11:04:05.000Z", "user": { "id": "vlb44hny" }, "message": "Login attempt failed" }
|
|
{"create":{"_index" : "logs"}}
|
|
{ "@timestamp": "2020-12-08T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
|
|
{"create":{"_index" : "logs"}}
|
|
{ "@timestamp": "2020-12-09T11:07:08.000Z", "user": { "id": "l7gk7f82" }, "message": "Logout successful" }
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
--
|
|
|
|
[discrete]
|
|
[[search-a-data-stream]]
|
|
=== Search a data stream
|
|
|
|
The following search APIs support data streams:
|
|
|
|
* <<search-search, Search>>
|
|
* <<async-search, Async search>>
|
|
* <<search-multi-search, Multi search>>
|
|
* <<search-field-caps, Field capabilities>>
|
|
////
|
|
* <<eql-search-api, EQL search>>
|
|
////
|
|
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
The following <<search-search,search API>> request searches the `logs` data
|
|
stream for documents with a timestamp between today and yesterday that also have
|
|
`message` value of `login successful`.
|
|
|
|
[source,console]
|
|
----
|
|
GET /logs/_search
|
|
{
|
|
"query": {
|
|
"bool": {
|
|
"must": {
|
|
"range": {
|
|
"@timestamp": {
|
|
"gte": "now-1d/d",
|
|
"lt": "now/d"
|
|
}
|
|
}
|
|
},
|
|
"should": {
|
|
"match": {
|
|
"message": "login successful"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
You can use a comma-separated list or wildcard (`*`) expression to search
|
|
multiple data streams, indices, and index aliases in the same request.
|
|
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
////
|
|
[source,console]
|
|
----
|
|
PUT /_data_stream/logs_alt
|
|
----
|
|
// TEST[continued]
|
|
////
|
|
|
|
The following request searches the `logs` and `logs_alt` data streams, which are
|
|
specified as a comma-separated list in the request path.
|
|
|
|
[source,console]
|
|
----
|
|
GET /logs,logs_alt/_search
|
|
{
|
|
"query": {
|
|
"match": {
|
|
"user.id": "8a4f500d"
|
|
}
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
|
|
The following request uses the `logs*` wildcard expression to search any data
|
|
stream, index, or index alias beginning with `logs`.
|
|
|
|
[source,console]
|
|
----
|
|
GET /logs*/_search
|
|
{
|
|
"query": {
|
|
"match": {
|
|
"user.id": "vlb44hny"
|
|
}
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
|
|
The following search request omits a target in the request path. The request
|
|
searches all data streams and indices in the cluster.
|
|
|
|
[source,console]
|
|
----
|
|
GET /_search
|
|
{
|
|
"query": {
|
|
"match": {
|
|
"user.id": "l7gk7f82"
|
|
}
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
[discrete]
|
|
[[manually-roll-over-a-data-stream]]
|
|
=== Manually roll over a data stream
|
|
|
|
A rollover creates a new backing index for a data stream. This new backing index
|
|
becomes the stream's <<data-stream-write-index,write index>> and increments
|
|
the stream's <<data-streams-generation,generation>>.
|
|
|
|
In most cases, we recommend using <<index-lifecycle-management,{ilm-init}>> to
|
|
automate rollovers for data streams. This lets you automatically roll over the
|
|
current write index when it meets specified criteria, such as a maximum age or
|
|
size.
|
|
|
|
However, you can also use the <<indices-rollover-index,rollover API>> to
|
|
manually perform a rollover. This can be useful if you want to
|
|
<<data-streams-change-mappings-and-settings,apply mapping or setting changes>>
|
|
to the stream's write index after updating a data stream's template.
|
|
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
The following <<indices-rollover-index,rollover API>> request submits a manual
|
|
rollover request for the `logs` data stream.
|
|
|
|
[source,console]
|
|
----
|
|
POST /logs/_rollover/
|
|
{
|
|
"conditions": {
|
|
"max_docs": "1"
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
[discrete]
|
|
[[reindex-with-a-data-stream]]
|
|
=== Reindex with a data stream
|
|
|
|
You can use the <<docs-reindex,reindex API>> to copy documents to a data stream
|
|
from an existing index, index alias, or data stream.
|
|
|
|
A reindex copies documents from a _source_ to a _destination_. The source and
|
|
destination can be any pre-existing index, index alias, or data stream. However,
|
|
the source and destination must be different. You cannot reindex a data stream
|
|
into itself.
|
|
|
|
Because data streams are <<data-streams-append-only,append-only>>, a reindex
|
|
request to a data stream destination must have an `op_type` of `create`. This
|
|
means a reindex can only add new documents to a data stream. It cannot update
|
|
existing documents in the data stream destination.
|
|
|
|
A reindex can be used to:
|
|
|
|
* Convert an existing index alias and collection of time-based indices into a
|
|
data stream.
|
|
|
|
* Apply a new or updated <<create-a-data-stream-template,index template>>
|
|
by reindexing an existing data stream into a new one. This applies mapping
|
|
and setting changes in the template to each document and backing index of the
|
|
data stream destination. See
|
|
<<data-streams-use-reindex-to-change-mappings-settings>>.
|
|
|
|
TIP: If you only want to update the mappings or settings of a data stream's
|
|
write index, we recommend you update the <<create-a-data-stream-template,data
|
|
stream's template>> and perform a <<manually-roll-over-a-data-stream,rollover>>.
|
|
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
The following reindex request copies documents from the `archive` index alias to
|
|
the existing `logs` data stream. Because the destination is a data stream, the
|
|
request's `op_type` is `create`.
|
|
|
|
////
|
|
[source,console]
|
|
----
|
|
PUT /_bulk?refresh=wait_for
|
|
{"create":{"_index" : "archive_1"}}
|
|
{ "@timestamp": "2020-12-08T11:04:05.000Z" }
|
|
{"create":{"_index" : "archive_2"}}
|
|
{ "@timestamp": "2020-12-08T11:06:07.000Z" }
|
|
{"create":{"_index" : "archive_2"}}
|
|
{ "@timestamp": "2020-12-09T11:07:08.000Z" }
|
|
{"create":{"_index" : "archive_2"}}
|
|
{ "@timestamp": "2020-12-09T11:07:08.000Z" }
|
|
|
|
POST /_aliases
|
|
{
|
|
"actions" : [
|
|
{ "add" : { "index" : "archive_1", "alias" : "archive" } },
|
|
{ "add" : { "index" : "archive_2", "alias" : "archive", "is_write_index" : true} }
|
|
]
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
////
|
|
|
|
[source,console]
|
|
----
|
|
POST /_reindex
|
|
{
|
|
"source": {
|
|
"index": "archive"
|
|
},
|
|
"dest": {
|
|
"index": "logs",
|
|
"op_type": "create"
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
You can also reindex documents from a data stream to an index, index
|
|
alias, or data stream.
|
|
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
The following reindex request copies documents from the `logs` data stream
|
|
to the existing `archive` index alias. Because the destination is not a data
|
|
stream, the `op_type` does not need to be specified.
|
|
|
|
[source,console]
|
|
----
|
|
POST /_reindex
|
|
{
|
|
"source": {
|
|
"index": "logs"
|
|
},
|
|
"dest": {
|
|
"index": "archive"
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
[discrete]
|
|
[[update-delete-docs-in-a-data-stream]]
|
|
=== Update or delete documents in a data stream
|
|
|
|
You can update or delete documents in a data stream using the following
|
|
requests:
|
|
|
|
* An <<docs-update-by-query,update by query API>> request
|
|
+
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
The following update by query API request updates documents in the `logs` data
|
|
stream with a `user.id` of `i96BP1mA`. The request uses a
|
|
<<modules-scripting-using,script>> to assign matching documents a new `user.id`
|
|
value of `XgdX0NoX`.
|
|
|
|
////
|
|
[source,console]
|
|
----
|
|
PUT /logs/_create/2?refresh=wait_for
|
|
{
|
|
"@timestamp": "2020-12-07T11:06:07.000Z",
|
|
"user": {
|
|
"id": "i96BP1mA"
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
////
|
|
|
|
[source,console]
|
|
----
|
|
POST /logs/_update_by_query
|
|
{
|
|
"query": {
|
|
"match": {
|
|
"user.id": "i96BP1mA"
|
|
}
|
|
},
|
|
"script": {
|
|
"source": "ctx._source.user.id = params.new_id",
|
|
"params": {
|
|
"new_id": "XgdX0NoX"
|
|
}
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
* A <<docs-delete-by-query,delete by query API>> request
|
|
+
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
The following delete by query API request deletes documents in the `logs` data
|
|
stream with a `user.id` of `zVZMamUM`.
|
|
|
|
////
|
|
[source,console]
|
|
----
|
|
PUT /logs/_create/1?refresh=wait_for
|
|
{
|
|
"@timestamp": "2020-12-07T11:06:07.000Z",
|
|
"user": {
|
|
"id": "zVZMamUM"
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
////
|
|
|
|
[source,console]
|
|
----
|
|
POST /logs/_delete_by_query
|
|
{
|
|
"query": {
|
|
"match": {
|
|
"user.id": "zVZMamUM"
|
|
}
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
[discrete]
|
|
[[update-delete-docs-in-a-backing-index]]
|
|
=== Update or delete documents in a backing index
|
|
|
|
Alternatively, you can update or delete documents in a data stream by sending
|
|
the update or deletion request to the backing index containing the document. To
|
|
do this, you first need to get:
|
|
|
|
* The <<mapping-id-field,document ID>>
|
|
* The name of the backing index that contains the document
|
|
|
|
If you want to update a document, you must also get its current
|
|
<<optimistic-concurrency-control,sequence number and primary term>>.
|
|
|
|
You can use a <<search-a-data-stream,search request>> to retrieve this
|
|
information.
|
|
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
////
|
|
[source,console]
|
|
----
|
|
PUT /logs/_create/bfspvnIBr7VVZlfp2lqX?refresh=wait_for
|
|
{
|
|
"@timestamp": "2020-12-07T11:06:07.000Z",
|
|
"user": {
|
|
"id": "yWIumJd7"
|
|
},
|
|
"message": "Login successful"
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
////
|
|
|
|
The following search request retrieves documents in the `logs` data stream with
|
|
a `user.id` of `yWIumJd7`. By default, this search returns the document ID and
|
|
backing index for any matching documents.
|
|
|
|
The request includes a `"seq_no_primary_term": true` argument. This means the
|
|
search also returns the sequence number and primary term for any matching
|
|
documents.
|
|
|
|
[source,console]
|
|
----
|
|
GET /logs/_search
|
|
{
|
|
"seq_no_primary_term": true,
|
|
"query": {
|
|
"match": {
|
|
"user.id": "yWIumJd7"
|
|
}
|
|
}
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
|
|
The API returns the following response. The `hits.hits` property contains
|
|
information for any documents matching the search.
|
|
|
|
[source,console-result]
|
|
----
|
|
{
|
|
"took": 20,
|
|
"timed_out": false,
|
|
"_shards": {
|
|
"total": 2,
|
|
"successful": 2,
|
|
"skipped": 0,
|
|
"failed": 0
|
|
},
|
|
"hits": {
|
|
"total": {
|
|
"value": 1,
|
|
"relation": "eq"
|
|
},
|
|
"max_score": 0.2876821,
|
|
"hits": [
|
|
{
|
|
"_index": ".ds-logs-000002", <1>
|
|
"_type": "_doc",
|
|
"_id": "bfspvnIBr7VVZlfp2lqX", <2>
|
|
"_seq_no": 8, <3>
|
|
"_primary_term": 1, <4>
|
|
"_score": 0.2876821,
|
|
"_source": {
|
|
"@timestamp": "2020-12-07T11:06:07.000Z",
|
|
"user": {
|
|
"id": "yWIumJd7"
|
|
},
|
|
"message": "Login successful"
|
|
}
|
|
}
|
|
]
|
|
}
|
|
}
|
|
----
|
|
// TESTRESPONSE[s/"took": 20/"took": $body.took/]
|
|
// TESTRESPONSE[s/"max_score": 0.2876821/"max_score": $body.hits.max_score/]
|
|
// TESTRESPONSE[s/"_score": 0.2876821/"_score": $body.hits.hits.0._score/]
|
|
|
|
<1> Backing index containing the matching document
|
|
<2> Document ID for the document
|
|
<3> Current sequence number for the document
|
|
<4> Primary term for the document
|
|
====
|
|
|
|
You can use an <<docs-index_,index API>> request to update an individual
|
|
document. To prevent an accidental overwrite, this request must include valid
|
|
`if_seq_no` and `if_primary_term` arguments.
|
|
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
The following index API request updates an existing document in the `logs` data
|
|
stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the
|
|
`.ds-logs-000002` backing index.
|
|
|
|
The request also includes the current sequence number and primary term in the
|
|
respective `if_seq_no` and `if_primary_term` query parameters. The request body
|
|
contains a new JSON source for the document.
|
|
|
|
[source,console]
|
|
----
|
|
PUT /.ds-logs-000002/_doc/bfspvnIBr7VVZlfp2lqX?if_seq_no=8&if_primary_term=1
|
|
{
|
|
"@timestamp": "2020-12-07T11:06:07.000Z",
|
|
"user": {
|
|
"id": "8a4f500d"
|
|
},
|
|
"message": "Login successful"
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
You use the <<docs-delete,delete API>> to delete individual documents. Deletion
|
|
requests do not require a sequence number or primary term.
|
|
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
The following index API request deletes an existing document in the `logs` data
|
|
stream. The request targets document ID `bfspvnIBr7VVZlfp2lqX` in the
|
|
`.ds-logs-000002` backing index.
|
|
|
|
[source,console]
|
|
----
|
|
DELETE /.ds-logs-000002/_doc/bfspvnIBr7VVZlfp2lqX
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
You can use the <<docs-bulk,bulk API>> to delete or update multiple documents in
|
|
one request using `delete`, `index`, or `update` actions.
|
|
|
|
If the action type is `index`, the action must include valid
|
|
<<bulk-optimistic-concurrency-control,`if_seq_no` and `if_primary_term`>>
|
|
arguments.
|
|
|
|
.*Example*
|
|
[%collapsible]
|
|
====
|
|
////
|
|
[source,console]
|
|
----
|
|
PUT /logs/_create/bfspvnIBr7VVZlfp2lqX?refresh=wait_for
|
|
{
|
|
"@timestamp": "2020-12-07T11:06:07.000Z",
|
|
"user": {
|
|
"id": "yWIumJd7"
|
|
},
|
|
"message": "Login successful"
|
|
}
|
|
----
|
|
// TEST[continued]
|
|
////
|
|
|
|
The following bulk API request uses an `index` action to update an existing
|
|
document in the `logs` data stream.
|
|
|
|
The `index` action targets document ID `bfspvnIBr7VVZlfp2lqX` in the
|
|
`.ds-logs-000002` backing index. The action also includes the current sequence
|
|
number and primary term in the respective `if_seq_no` and `if_primary_term`
|
|
parameters.
|
|
|
|
[source,console]
|
|
----
|
|
PUT /_bulk?refresh
|
|
{ "index": { "_index": ".ds-logs-000002", "_id": "bfspvnIBr7VVZlfp2lqX", "if_seq_no": 8, "if_primary_term": 1 } }
|
|
{ "@timestamp": "2020-12-07T11:06:07.000Z", "user": { "id": "8a4f500d" }, "message": "Login successful" }
|
|
----
|
|
// TEST[continued]
|
|
====
|
|
|
|
////
|
|
[source,console]
|
|
----
|
|
DELETE /_data_stream/logs
|
|
|
|
DELETE /_data_stream/logs_alt
|
|
|
|
DELETE /_index_template/logs_data_stream
|
|
----
|
|
// TEST[continued]
|
|
//// |