Ensure template exists when creating data stream (#57275)

Backporting #56888 to 7.x branch.

Limit the creation of data streams only for namespaces that have a composable template with a data stream definition.

This way we ensure that mappings/settings have been specified and will be used at data stream creation and data stream rollover.

Also remove `timestamp_field` parameter from create data stream request and
let the create data stream api resolve the timestamp field
from the data stream definition snippet inside a composable template.

Relates to #53100
This commit is contained in:
Martijn van Groningen 2020-05-28 15:08:25 +02:00 committed by GitHub
parent 51158a2d8b
commit 225ccd1cfa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 399 additions and 128 deletions

View File

@ -23,18 +23,32 @@ addressed directly, data streams are integrated with the
<<index-lifecycle-management, index lifecycle management (ILM)>> to facilitate
the management of the time series data contained in their backing indices.
A data stream can only be created if the namespace it targets has a component
template exists with a `data_stream` definition.
[source,console]
-----------------------------------
PUT _index_template/template
{
"index_patterns": ["my-data-stream*"],
"data_stream": {
"timestamp_field": "@timestamp"
}
}
-----------------------------------
// TEST
[source,console]
--------------------------------------------------
PUT _data_stream/my-data-stream
{
"timestamp_field": "@timestamp"
}
--------------------------------------------------
// TEST[continued]
////
[source,console]
-----------------------------------
DELETE /_data_stream/my-data-stream
DELETE /_index_template/template
-----------------------------------
// TEST[continued]
////
@ -71,11 +85,3 @@ Data stream names must meet the following criteria:
will count towards the 255 limit faster)
--
[[indices-create-data-stream-api-request-body]]
==== {api-request-body-title}
`timestamp_field`::
(Required, string) The name of the timestamp field. This field must be present
in all documents indexed into the data stream and must be of type
<<date, `date`>> or <<date_nanos, `date_nanos`>>.

View File

@ -9,10 +9,15 @@ Deletes an existing data stream along with its backing indices.
////
[source,console]
-----------------------------------
PUT /_data_stream/my-data-stream
PUT _index_template/template
{
"timestamp_field" : "@timestamp"
"index_patterns": ["my-data-stream*"],
"data_stream": {
"timestamp_field": "@timestamp"
}
}
PUT /_data_stream/my-data-stream
-----------------------------------
// TESTSETUP
////
@ -22,6 +27,13 @@ PUT /_data_stream/my-data-stream
DELETE _data_stream/my-data-stream
--------------------------------------------------
////
[source,console]
-----------------------------------
DELETE /_index_template/template
-----------------------------------
// TEST[continued]
////
[[delete-data-stream-api-request]]
==== {api-request-title}

View File

@ -9,10 +9,15 @@ Returns information about one or more data streams.
////
[source,console]
-----------------------------------
PUT /_data_stream/my-data-stream
PUT _index_template/template
{
"timestamp_field" : "@timestamp"
"index_patterns": ["my-data-stream*"],
"data_stream": {
"timestamp_field": "@timestamp"
}
}
PUT /_data_stream/my-data-stream
-----------------------------------
// TESTSETUP
////
@ -21,6 +26,7 @@ PUT /_data_stream/my-data-stream
[source,console]
-----------------------------------
DELETE /_data_stream/my-data-stream
DELETE /_index_template/template
-----------------------------------
// TEARDOWN
////

View File

@ -26,7 +26,7 @@ specify settings, mappings, and aliases.
If a new index matches more than one index template, the index template with the highest priority is used.
If an index is created with explicit settings and also matches an index template,
If an index is created with explicit settings and also matches an index template,
the settings from the create index request take precedence over settings specified in the index template and its component templates.
[source,console]
@ -112,7 +112,7 @@ DELETE _component_template/*
[[put-index-template-api-desc]]
==== {api-description-title}
Creates or updates an index template.
Creates or updates an index template.
// tag::index-template-def[]
Index templates define <<index-modules-settings,settings>> and <<mapping,mappings>> that you can
@ -555,3 +555,36 @@ PUT /_index_template/template_1
--------------------------------------------------
To check the `_meta`, you can use the <<indices-get-template, get index template>> API.
[[data-stream-definition]]
===== Data stream definition
If a composable template should auto create a data stream instead of an index then
a `data_stream` definition can be added to a composable template.
[source,console]
--------------------------------------------------
PUT /_index_template/template_1
{
"index_patterns": ["logs-*"],
"template": {
"mappings": {
"properties": {
"@timestamp": {
"type": "date"
}
}
}
},
"data_stream": {
"timestamp_field": "@timestamp"
}
}
--------------------------------------------------
Required properties of a data stream definition:
`timestamp_field`::
(Required, string) The name of the timestamp field. This field must be present
in all documents indexed into the data stream and must be of type
<<date, `date`>> or <<date_nanos, `date_nanos`>>.

View File

@ -145,12 +145,12 @@ The document count does *not* include documents in replica shards.
`max_size`::
(Optional, <<byte-units, byte units>>)
Maximum index size.
This is the total size of all primary shards in the index.
Maximum index size.
This is the total size of all primary shards in the index.
Replicas are not counted toward the maximum index size.
TIP: To see the current index size, use the <<cat-indices, _cat indices>> API.
The `pri.store.size` value shows the combined size of all primary shards.
TIP: To see the current index size, use the <<cat-indices, _cat indices>> API.
The `pri.store.size` value shows the combined size of all primary shards.
--
include::{docdir}/rest-api/common-parms.asciidoc[tag=mappings]
@ -214,6 +214,86 @@ The API returns the following response:
<2> Whether the rollover was dry run.
<3> The result of each condition.
[[rollover-data-stream-ex]]
===== Roll over a data stream
[source,console]
-----------------------------------
PUT _index_template/template
{
"index_patterns": ["my-data-stream*"],
"data_stream": {
"timestamp_field": "@timestamp"
}
}
-----------------------------------
// TEST
[source,console]
--------------------------------------------------
PUT /_data_stream/my-data-stream <1>
# Add > 1000 documents to my-data-stream
POST /my-data-stream/_rollover <2>
{
"conditions" : {
"max_age": "7d",
"max_docs": 1000,
"max_size": "5gb"
}
}
--------------------------------------------------
// TEST[continued]
// TEST[setup:huge_twitter]
// TEST[s/# Add > 1000 documents to my-data-stream/POST _reindex?refresh\n{"source":{"index":"twitter"},"dest":{"index":"my-data-stream-000001"}}/]
<1> Creates a data stream called `my-data-stream` with one initial backing index
named `my-data-stream-000001`.
<2> This request creates a new backing index, `my-data-stream-000002`, and adds
it as the write index for the `my-data-stream` data stream if the current
write index meets at least one of the following conditions:
+
--
* The index was created 7 or more days ago.
* The index has an index size of 5GB or greater.
* The index contains 1,000 or more documents.
--
The API returns the following response:
[source,console-result]
--------------------------------------------------
{
"acknowledged": true,
"shards_acknowledged": true,
"old_index": "my-data-stream-000001", <1>
"new_index": "my-data-stream-000002", <2>
"rolled_over": true, <3>
"dry_run": false, <4>
"conditions": { <5>
"[max_age: 7d]": false,
"[max_docs: 1000]": true,
"[max_size: 5gb]": false,
}
}
--------------------------------------------------
<1> The previous write index for the data stream.
<2> The new write index for the data stream.
<3> Whether the index was rolled over.
<4> Whether the rollover was dry run.
<5> The result of each condition.
////
[source,console]
-----------------------------------
DELETE /_data_stream/my-data-stream
DELETE /_index_template/*
-----------------------------------
// TEST[continued]
////
[[rollover-index-settings-ex]]
===== Specify settings for the target index

View File

@ -24,8 +24,7 @@
"params":{
},
"body":{
"description":"The data stream definition",
"required":true
"description":"The data stream definition"
}
}
}

View File

@ -1,3 +1,25 @@
setup:
- skip:
features: allowed_warnings
- do:
allowed_warnings:
- "index template [my-template1] has index patterns [simple-data-stream1*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
indices.put_index_template:
name: my-template1
body:
index_patterns: [simple-data-stream1]
data_stream:
timestamp_field: '@timestamp'
- do:
allowed_warnings:
- "index template [my-template2] has index patterns [simple-data-stream2*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template2] will take precedence during new index creation"
indices.put_index_template:
name: my-template2
body:
index_patterns: [simple-data-stream2]
data_stream:
timestamp_field: '@timestamp2'
---
"Create data stream":
- skip:
@ -7,15 +29,11 @@
- do:
indices.create_data_stream:
name: simple-data-stream1
body:
timestamp_field: "@timestamp"
- is_true: acknowledged
- do:
indices.create_data_stream:
name: simple-data-stream2
body:
timestamp_field: "@timestamp2"
- is_true: acknowledged
- do:
@ -69,11 +87,10 @@
catch: bad_request
indices.create_data_stream:
name: invalid-data-stream#-name
body:
timestamp_field: "@timestamp"
- match: { status: 400 }
- match: { error.root_cause.0.type: "illegal_argument_exception" }
- match: { error.root_cause.0.reason: "data_stream [invalid-data-stream#-name] must not contain '#'" }
---
"Get data stream":
@ -83,41 +100,37 @@
- do:
indices.create_data_stream:
name: get-data-stream1
body:
timestamp_field: "@timestamp"
name: simple-data-stream1
- is_true: acknowledged
- do:
indices.create_data_stream:
name: get-data-stream2
body:
timestamp_field: "@timestamp2"
name: simple-data-stream2
- is_true: acknowledged
- do:
indices.get_data_stream: {}
- match: { 0.name: get-data-stream1 }
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.generation: 1 }
- match: { 1.name: get-data-stream2 }
- match: { 1.name: simple-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 1.generation: 1 }
- do:
indices.get_data_stream:
name: get-data-stream1
- match: { 0.name: get-data-stream1 }
name: simple-data-stream1
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.generation: 1 }
- do:
indices.get_data_stream:
name: get-data-*
- match: { 0.name: get-data-stream1 }
name: simple-data-stream*
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.generation: 1 }
- match: { 1.name: get-data-stream2 }
- match: { 1.name: simple-data-stream2 }
- match: { 1.timestamp_field: '@timestamp2' }
- match: { 1.generation: 1 }
@ -136,12 +149,12 @@
- do:
indices.delete_data_stream:
name: get-data-stream1
name: simple-data-stream1
- is_true: acknowledged
- do:
indices.delete_data_stream:
name: get-data-stream2
name: simple-data-stream2
- is_true: acknowledged
---
@ -152,9 +165,7 @@
- do:
indices.create_data_stream:
name: delete-data-stream1
body:
timestamp_field: "@timestamp"
name: simple-data-stream1
- is_true: acknowledged
- do:
@ -167,25 +178,25 @@
- do:
indices.get:
index: ['delete-data-stream1-000001', 'test_index']
index: ['simple-data-stream1-000001', 'test_index']
- is_true: test_index.settings
- is_true: delete-data-stream1-000001.settings
- is_true: simple-data-stream1-000001.settings
- do:
indices.get_data_stream: {}
- match: { 0.name: delete-data-stream1 }
- match: { 0.name: simple-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- match: { 0.generation: 1 }
- length: { 0.indices: 1 }
- match: { 0.indices.0.index_name: 'delete-data-stream1-000001' }
- match: { 0.indices.0.index_name: 'simple-data-stream1-000001' }
- do:
indices.delete_data_stream:
name: delete-data-stream1
name: simple-data-stream1
- is_true: acknowledged
- do:
catch: missing
indices.get:
index: "delete-data-stream1-000001"
index: "simple-data-stream1-000001"

View File

@ -1,14 +1,23 @@
---
"Test apis that do not supported data streams":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
version: " - 7.99.99"
reason: "mute bwc until backported"
features: allowed_warnings
- do:
allowed_warnings:
- "index template [my-template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
indices.put_index_template:
name: my-template
body:
index_patterns: [logs-*]
data_stream:
timestamp_field: '@timestamp'
- do:
indices.create_data_stream:
name: logs-foobar
body:
timestamp_field: "@timestamp"
- is_true: acknowledged
- do:

View File

@ -1,14 +1,25 @@
setup:
- skip:
features: allowed_warnings
- do:
allowed_warnings:
- "index template [my-template] has index patterns [simple-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
indices.put_index_template:
name: my-template
body:
index_patterns: [simple-*]
data_stream:
timestamp_field: '@timestamp'
---
"Delete backing index on data stream":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
version: " - 7.99.99"
reason: "mute bwc until backported"
- do:
indices.create_data_stream:
name: simple-data-stream
body:
timestamp_field: "@timestamp"
- is_true: acknowledged
# rollover data stream to create new backing index
@ -55,14 +66,12 @@
---
"Attempt to delete write index on data stream is rejected":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
version: " - 7.99.99"
reason: "mute bwc until backported"
- do:
indices.create_data_stream:
name: simple-data-stream
body:
timestamp_field: "@timestamp"
- is_true: acknowledged
# rollover data stream to create new backing index

View File

@ -1,14 +1,23 @@
---
"Get backing indices for data stream":
- skip:
version: " - 7.8.99"
version: " - 7.9.99"
reason: "data streams only supported in 7.9+"
features: allowed_warnings
- do:
allowed_warnings:
- "index template [my-template] has index patterns [data-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
indices.put_index_template:
name: my-template
body:
index_patterns: [data-*]
data_stream:
timestamp_field: '@timestamp'
- do:
indices.create_data_stream:
name: data-stream1
body:
timestamp_field: "@timestamp"
- is_true: acknowledged
- do:

View File

@ -1,14 +1,23 @@
---
"Roll over a data stream":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
version: " - 7.99.99"
reason: "mute bwc until backported"
features: allowed_warnings
- do:
allowed_warnings:
- "index template [my-template] has index patterns [data-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
indices.put_index_template:
name: my-template
body:
index_patterns: [data-*]
data_stream:
timestamp_field: '@timestamp'
- do:
indices.create_data_stream:
name: data-stream-for-rollover
body:
timestamp_field: "@timestamp"
- is_true: acknowledged
# rollover data stream to create new backing index

View File

@ -39,7 +39,6 @@ import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.Template;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -219,14 +218,12 @@ public class BulkIntegrationIT extends ESIntegTestCase {
}
}
public void testMixedAutoCreate() {
Settings settings = Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0).build();
public void testMixedAutoCreate() throws Exception {
PutComposableIndexTemplateAction.Request createTemplateRequest = new PutComposableIndexTemplateAction.Request("logs-foo");
createTemplateRequest.indexTemplate(
new ComposableIndexTemplate(
Collections.singletonList("logs-foo*"),
new Template(settings, null, null),
null,
null, null, null, null,
new ComposableIndexTemplate.DataStreamTemplate("@timestamp"))
);

View File

@ -28,6 +28,8 @@ import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
import org.elasticsearch.action.admin.indices.rollover.RolloverResponse;
import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse;
import org.elasticsearch.action.bulk.BulkRequest;
@ -41,13 +43,17 @@ import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.test.ESIntegTestCase;
import org.junit.After;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@ -72,13 +78,19 @@ import static org.hamcrest.Matchers.notNullValue;
public class DataStreamIT extends ESIntegTestCase {
@After
public void deleteAllComposableTemplates() {
DeleteComposableIndexTemplateAction.Request deleteTemplateRequest = new DeleteComposableIndexTemplateAction.Request("*");
client().execute(DeleteComposableIndexTemplateAction.INSTANCE, deleteTemplateRequest).actionGet();
}
public void testBasicScenario() throws Exception {
createIndexTemplate("id1", "metrics-foo*", "@timestamp1");
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request("metrics-foo");
createDataStreamRequest.setTimestampFieldName("@timestamp1");
client().admin().indices().createDataStream(createDataStreamRequest).get();
createIndexTemplate("id2", "metrics-bar*", "@timestamp2");
createDataStreamRequest = new CreateDataStreamAction.Request("metrics-bar");
createDataStreamRequest.setTimestampFieldName("@timestamp2");
client().admin().indices().createDataStream(createDataStreamRequest).get();
GetDataStreamAction.Request getDataStreamRequest = new GetDataStreamAction.Request("*");
@ -151,9 +163,9 @@ public class DataStreamIT extends ESIntegTestCase {
}
public void testOtherWriteOps() throws Exception {
createIndexTemplate("id", "metrics-foobar*", "@timestamp1");
String dataStreamName = "metrics-foobar";
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
createDataStreamRequest.setTimestampFieldName("@timestamp1");
client().admin().indices().createDataStream(createDataStreamRequest).get();
{
@ -199,10 +211,10 @@ public class DataStreamIT extends ESIntegTestCase {
}
}
public void testDataStreamsResolvability() {
public void testDataStreamsResolvability() throws Exception {
createIndexTemplate("id", "logs-*", "ts");
String dataStreamName = "logs-foobar";
CreateDataStreamAction.Request request = new CreateDataStreamAction.Request(dataStreamName);
request.setTimestampFieldName("ts");
client().admin().indices().createDataStream(request).actionGet();
verifyResolvability(dataStreamName, client().prepareIndex(dataStreamName, "_doc")
@ -230,7 +242,6 @@ public class DataStreamIT extends ESIntegTestCase {
verifyResolvability(dataStreamName, client().prepareFieldCaps(dataStreamName).setFields("*"), false);
request = new CreateDataStreamAction.Request("logs-barbaz");
request.setTimestampFieldName("ts");
client().admin().indices().createDataStream(request).actionGet();
verifyResolvability("logs-barbaz", client().prepareIndex("logs-barbaz", "_doc")
.setSource("{}", XContentType.JSON)
@ -325,4 +336,16 @@ public class DataStreamIT extends ESIntegTestCase {
"] matches a data stream, specify the corresponding concrete indices instead."));
}
static void createIndexTemplate(String id, String pattern, String timestampFieldName) throws IOException {
PutComposableIndexTemplateAction.Request request = new PutComposableIndexTemplateAction.Request(id);
request.indexTemplate(
new ComposableIndexTemplate(
Collections.singletonList(pattern),
null,
null, null, null, null,
new ComposableIndexTemplate.DataStreamTemplate(timestampFieldName))
);
client().execute(PutComposableIndexTemplateAction.INSTANCE, request).actionGet();
}
}

View File

@ -124,7 +124,7 @@ public final class AutoCreateAction extends ActionType<CreateIndexResponse> {
DataStreamTemplate dataStreamTemplate = resolveAutoCreateDataStream(request, currentState.metadata());
if (dataStreamTemplate != null) {
CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest(
request.index(), dataStreamTemplate.getTimestampField(), request.masterNodeTimeout(), request.timeout());
request.index(), request.masterNodeTimeout(), request.timeout());
ClusterState clusterState = metadataCreateDataStreamService.createDataStream(createRequest, currentState);
indexNameRef.set(clusterState.metadata().dataStreams().get(request.index()).getIndices().get(0).getName());
return clusterState;

View File

@ -55,39 +55,29 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
public static class Request extends AcknowledgedRequest<Request> {
private final String name;
private String timestampFieldName;
public Request(String name) {
this.name = name;
}
public void setTimestampFieldName(String timestampFieldName) {
this.timestampFieldName = timestampFieldName;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (Strings.hasText(name) == false) {
validationException = ValidateActions.addValidationError("name is missing", validationException);
}
if (Strings.hasText(timestampFieldName) == false) {
validationException = ValidateActions.addValidationError("timestamp field name is missing", validationException);
}
return validationException;
}
public Request(StreamInput in) throws IOException {
super(in);
this.name = in.readString();
this.timestampFieldName = in.readString();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(name);
out.writeString(timestampFieldName);
}
@Override
@ -95,13 +85,12 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return name.equals(request.name) &&
timestampFieldName.equals(request.timestampFieldName);
return name.equals(request.name);
}
@Override
public int hashCode() {
return Objects.hash(name, timestampFieldName);
return Objects.hash(name);
}
}
@ -132,7 +121,6 @@ public class CreateDataStreamAction extends ActionType<AcknowledgedResponse> {
ActionListener<AcknowledgedResponse> listener) throws Exception {
CreateDataStreamClusterStateUpdateRequest updateRequest = new CreateDataStreamClusterStateUpdateRequest(
request.name,
request.timestampFieldName,
request.masterNodeTimeout(),
request.timeout()
);

View File

@ -50,6 +50,7 @@ import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.ALIAS;
import static org.elasticsearch.cluster.metadata.IndexAbstraction.Type.DATA_STREAM;
import static org.elasticsearch.cluster.metadata.MetadataCreateDataStreamService.lookupTemplateForDataStream;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findV1Templates;
import static org.elasticsearch.cluster.metadata.MetadataIndexTemplateService.findV2Template;
@ -142,6 +143,8 @@ public class MetadataRolloverService {
private RolloverResult rolloverDataStream(ClusterState currentState, IndexAbstraction.DataStream dataStream, String dataStreamName,
CreateIndexRequest createIndexRequest, List<Condition<?>> metConditions,
boolean silent) throws Exception {
lookupTemplateForDataStream(dataStreamName, currentState.metadata());
final DataStream ds = dataStream.getDataStream();
final IndexMetadata originalWriteIndex = dataStream.getWriteIndex();
final String newWriteIndexName = DataStream.getBackingIndexName(ds.getName(), ds.getGeneration() + 1);

View File

@ -101,14 +101,11 @@ public class MetadataCreateDataStreamService {
public static final class CreateDataStreamClusterStateUpdateRequest extends ClusterStateUpdateRequest {
private final String name;
private final String timestampFieldName;
public CreateDataStreamClusterStateUpdateRequest(String name,
String timestampFieldName,
TimeValue masterNodeTimeout,
TimeValue timeout) {
this.name = name;
this.timestampFieldName = timestampFieldName;
masterNodeTimeout(masterNodeTimeout);
ackTimeout(timeout);
}
@ -131,6 +128,8 @@ public class MetadataCreateDataStreamService {
throw new IllegalArgumentException("data_stream [" + request.name + "] must not start with '.'");
}
ComposableIndexTemplate template = lookupTemplateForDataStream(request.name, currentState.metadata());
String firstBackingIndexName = DataStream.getBackingIndexName(request.name, 1);
CreateIndexClusterStateUpdateRequest createIndexRequest =
new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName)
@ -140,9 +139,23 @@ public class MetadataCreateDataStreamService {
assert firstBackingIndex != null;
Metadata.Builder builder = Metadata.builder(currentState.metadata()).put(
new DataStream(request.name, request.timestampFieldName, Collections.singletonList(firstBackingIndex.getIndex())));
new DataStream(request.name, template.getDataStreamTemplate().getTimestampField(),
Collections.singletonList(firstBackingIndex.getIndex())));
logger.info("adding data stream [{}]", request.name);
return ClusterState.builder(currentState).metadata(builder).build();
}
public static ComposableIndexTemplate lookupTemplateForDataStream(String dataStreamName, Metadata metadata) {
final String v2Template = MetadataIndexTemplateService.findV2Template(metadata, dataStreamName, false);
if (v2Template == null) {
throw new IllegalArgumentException("no matching index template found for data stream [" + dataStreamName + "]");
}
ComposableIndexTemplate composableIndexTemplate = metadata.templatesV2().get(v2Template);
if (composableIndexTemplate.getDataStreamTemplate() == null) {
throw new IllegalArgumentException("matching index template [" + v2Template + "] for data stream [" + dataStreamName +
"] has no data stream template");
}
return composableIndexTemplate;
}
}

View File

@ -20,7 +20,6 @@ package org.elasticsearch.rest.action.admin.indices;
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
@ -28,7 +27,6 @@ import org.elasticsearch.rest.action.RestToXContentListener;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class RestCreateDataStreamAction extends BaseRestHandler {
@ -47,13 +45,6 @@ public class RestCreateDataStreamAction extends BaseRestHandler {
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
CreateDataStreamAction.Request putDataStreamRequest = new CreateDataStreamAction.Request(request.param("name"));
request.withContentOrSourceParamParserOrNull(parser -> {
Map<String, Object> body = parser.map();
String timeStampFieldName = (String) body.get(DataStream.TIMESTAMP_FIELD_FIELD.getPreferredName());
if (timeStampFieldName != null) {
putDataStreamRequest.setTimestampFieldName(timeStampFieldName);
}
});
return channel -> client.admin().indices().createDataStream(putDataStreamRequest, new RestToXContentListener<>(channel));
}
}

View File

@ -35,24 +35,21 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCas
@Override
protected Request createTestInstance() {
Request request = new Request(randomAlphaOfLength(8));
request.setTimestampFieldName(randomAlphaOfLength(8));
return request;
return new Request(randomAlphaOfLength(8));
}
public void testValidateRequest() {
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream");
req.setTimestampFieldName("my-timestamp-field");
ActionRequestValidationException e = req.validate();
assertNull(e);
}
public void testValidateRequestWithoutTimestampField() {
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("my-data-stream");
public void testValidateRequestWithoutName() {
CreateDataStreamAction.Request req = new CreateDataStreamAction.Request("");
ActionRequestValidationException e = req.validate();
assertNotNull(e);
assertThat(e.validationErrors().size(), equalTo(1));
assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing"));
assertThat(e.validationErrors().get(0), containsString("name is missing"));
}
}

View File

@ -517,7 +517,10 @@ public class MetadataRolloverServiceTests extends ESTestCase {
public void testRolloverClusterStateForDataStream() throws Exception {
final DataStream dataStream = DataStreamTests.randomInstance();
ComposableIndexTemplate template = new ComposableIndexTemplate(Collections.singletonList(dataStream.getName() + "*"),
null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate("@timestamp"));
Metadata.Builder builder = Metadata.builder();
builder.put("template", template);
for (Index index : dataStream.getIndices()) {
builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
}
@ -577,6 +580,38 @@ public class MetadataRolloverServiceTests extends ESTestCase {
}
}
public void testRolloverClusterStateForDataStreamNoTemplate() throws Exception {
final DataStream dataStream = DataStreamTests.randomInstance();
Metadata.Builder builder = Metadata.builder();
for (Index index : dataStream.getIndices()) {
builder.put(DataStreamTestHelper.getIndexMetadataBuilderForIndex(index));
}
builder.put(dataStream);
final ClusterState clusterState = ClusterState.builder(new ClusterName("test")).metadata(builder).build();
ThreadPool testThreadPool = mock(ThreadPool.class);
ClusterService clusterService = ClusterServiceUtils.createClusterService(testThreadPool);
Environment env = mock(Environment.class);
AllocationService allocationService = mock(AllocationService.class);
IndicesService indicesService = mockIndicesServices();
IndexNameExpressionResolver mockIndexNameExpressionResolver = mock(IndexNameExpressionResolver.class);
MetadataCreateIndexService createIndexService = new MetadataCreateIndexService(Settings.EMPTY,
clusterService, indicesService, allocationService, null, env, null, testThreadPool, null, Collections.emptyList(), false);
MetadataIndexAliasesService indexAliasesService = new MetadataIndexAliasesService(clusterService, indicesService,
new AliasValidator(), null, xContentRegistry());
MetadataRolloverService rolloverService = new MetadataRolloverService(testThreadPool, createIndexService, indexAliasesService,
mockIndexNameExpressionResolver);
MaxDocsCondition condition = new MaxDocsCondition(randomNonNegativeLong());
List<Condition<?>> metConditions = Collections.singletonList(condition);
CreateIndexRequest createIndexRequest = new CreateIndexRequest("_na_");
Exception e = expectThrows(IllegalArgumentException.class, () -> rolloverService.rolloverClusterState(clusterState,
dataStream.getName(), null, createIndexRequest, metConditions, false));
assertThat(e.getMessage(), equalTo("no matching index template found for data stream [" + dataStream.getName() + "]"));
}
private IndicesService mockIndicesServices() throws Exception {
/*
* Throws Exception because Eclipse uses the lower bound for

View File

@ -43,9 +43,13 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
public void testCreateDataStream() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
ComposableIndexTemplate template = new ComposableIndexTemplate(Collections.singletonList(dataStreamName + "*"),
null, null, null, null, null, new ComposableIndexTemplate.DataStreamTemplate("@timestamp"));
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().put("template", template).build())
.build();
CreateDataStreamClusterStateUpdateRequest req =
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
ClusterState newState = MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(1));
assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName));
@ -62,7 +66,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().dataStreams(Collections.singletonMap(dataStreamName, existingDataStream)).build()).build();
CreateDataStreamClusterStateUpdateRequest req =
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
@ -74,7 +78,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
final String dataStreamName = "_My-da#ta- ,stream-";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamClusterStateUpdateRequest req =
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("must not contain the following characters"));
@ -85,7 +89,7 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
final String dataStreamName = "MAY_NOT_USE_UPPERCASE";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamClusterStateUpdateRequest req =
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must be lowercase"));
@ -96,12 +100,40 @@ public class MetadataCreateDataStreamServiceTests extends ESTestCase {
final String dataStreamName = ".may_not_start_with_period";
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
CreateDataStreamClusterStateUpdateRequest req =
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, "@timestamp", TimeValue.ZERO, TimeValue.ZERO);
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
IllegalArgumentException e = expectThrows(IllegalArgumentException.class,
() -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] must not start with '.'"));
}
public void testCreateDataStreamNoTemplate() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.build();
CreateDataStreamClusterStateUpdateRequest req =
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
Exception e = expectThrows(IllegalArgumentException.class,
() -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(), equalTo("no matching index template found for data stream [my-data-stream]"));
}
public void testCreateDataStreamNoValidTemplate() throws Exception {
final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService();
final String dataStreamName = "my-data-stream";
ComposableIndexTemplate template =
new ComposableIndexTemplate(Collections.singletonList(dataStreamName + "*"), null, null, null, null, null, null);
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().put("template", template).build())
.build();
CreateDataStreamClusterStateUpdateRequest req =
new CreateDataStreamClusterStateUpdateRequest(dataStreamName, TimeValue.ZERO, TimeValue.ZERO);
Exception e = expectThrows(IllegalArgumentException.class,
() -> MetadataCreateDataStreamService.createDataStream(metadataCreateIndexService, cs, req));
assertThat(e.getMessage(),
equalTo("matching index template [template] for data stream [my-data-stream] has no data stream template"));
}
private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception {
MetadataCreateIndexService s = mock(MetadataCreateIndexService.class);
when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean()))

View File

@ -1,14 +1,23 @@
---
"Verify data stream resolvability for xpack apis":
- skip:
version: " - 7.8.99"
reason: "data streams only supported in 7.9+"
version: " - 7.99.99"
reason: "mute bwc until backported"
features: allowed_warnings
- do:
allowed_warnings:
- "index template [my-template] has index patterns [logs-*] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template] will take precedence during new index creation"
indices.put_index_template:
name: my-template
body:
index_patterns: [logs-*]
data_stream:
timestamp_field: '@timestamp'
- do:
indices.create_data_stream:
name: logs-foobar
body:
timestamp_field: "@timestamp"
- is_true: acknowledged
- do: