From 39c4ec6821c7fc7cbd0bb8e558007548eca0e10c Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Thu, 2 Apr 2020 17:19:35 -0500 Subject: [PATCH] [7.x] Create first backing index when creating data stream --- .../test/indices.data_stream/10_basic.yml | 15 ++++-- .../datastream/CreateDataStreamAction.java | 27 ++++++++-- .../cluster/metadata/IndexAbstraction.java | 15 ++++-- .../cluster/metadata/Metadata.java | 27 +++++++--- .../CreateDataStreamRequestTests.java | 49 ++++++++++++++++--- .../cluster/metadata/MetadataTests.java | 27 +++++++++- 6 files changed, 134 insertions(+), 26 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 44e8fdf7200..52e56003fb1 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -1,8 +1,8 @@ --- "Create data stream": - skip: - version: " - 7.6.99" - reason: available only in 7.7+ + version: " - 7.99.99" + reason: "enable in 7.8+ after back-porting https://github.com/elastic/elasticsearch/pull/54467" - do: indices.create_data_stream: @@ -22,10 +22,17 @@ indices.get_data_streams: {} - match: { 0.name: simple-data-stream1 } - match: { 0.timestamp_field: '@timestamp' } - - match: { 0.indices: [] } + - length: { 0.indices: 1 } + - match: { 0.indices.0.index_name: 'simple-data-stream1-000001' } - match: { 1.name: simple-data-stream2 } - match: { 1.timestamp_field: '@timestamp2' } - - match: { 1.indices: [] } + - length: { 1.indices: 1 } + - match: { 1.indices.0.index_name: 'simple-data-stream2-000001' } + + - do: + indices.delete_data_stream: + name: simple-data-stream1 + - is_true: acknowledged - do: indices.delete_data_stream: diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java index 53698fe38af..a305e294fc4 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -24,6 +24,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; @@ -33,21 +34,23 @@ import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.List; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; -import java.util.Collections; import java.util.Objects; public class CreateDataStreamAction extends ActionType { @@ -116,10 +119,14 @@ public class CreateDataStreamAction extends ActionType { public static class TransportAction extends TransportMasterNodeAction { + private final MetadataCreateIndexService metadataCreateIndexService; + @Inject public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + MetadataCreateIndexService metaDataCreateIndexService) { super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + this.metadataCreateIndexService = metaDataCreateIndexService; } @Override @@ -150,7 +157,7 @@ public class CreateDataStreamAction extends ActionType { @Override public ClusterState execute(ClusterState currentState) throws Exception { - return createDataStream(currentState, request); + return createDataStream(metadataCreateIndexService, currentState, request); } @Override @@ -160,7 +167,9 @@ public class CreateDataStreamAction extends ActionType { }); } - static ClusterState createDataStream(ClusterState currentState, Request request) { + static ClusterState createDataStream(MetadataCreateIndexService metadataCreateIndexService, + ClusterState currentState, + Request request) throws Exception { if (currentState.metadata().dataStreams().containsKey(request.name)) { throw new IllegalArgumentException("data_stream [" + request.name + "] already exists"); } @@ -168,8 +177,16 @@ public class CreateDataStreamAction extends ActionType { MetadataCreateIndexService.validateIndexOrAliasName(request.name, (s1, s2) -> new IllegalArgumentException("data_stream [" + s1 + "] " + s2)); + String firstBackingIndexName = request.name + "-000001"; + CreateIndexClusterStateUpdateRequest createIndexRequest = + new CreateIndexClusterStateUpdateRequest("initialize_data_stream", firstBackingIndexName, firstBackingIndexName) + .settings(Settings.builder().put("index.hidden", true).build()); + currentState = metadataCreateIndexService.applyCreateIndexRequest(currentState, createIndexRequest, false); + IndexMetadata firstBackingIndex = currentState.metadata().index(firstBackingIndexName); + assert firstBackingIndex != null; + Metadata.Builder builder = Metadata.builder(currentState.metadata()).put( - new DataStream(request.name, request.timestampFieldName, Collections.emptyList())); + new DataStream(request.name, request.timestampFieldName, List.of(firstBackingIndex.getIndex()))); logger.info("adding data stream [{}]", request.name); return ClusterState.builder(currentState).metadata(builder).build(); } diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java index 7448986cec3..767721b48da 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/IndexAbstraction.java @@ -57,7 +57,7 @@ public interface IndexAbstraction { /** * A write index is a dedicated concrete index, that accepts all the new documents that belong to an index abstraction. - * + *

* A write index may also be a regular concrete index of a index abstraction and may therefore also be returned * by {@link #getIndices()}. An index abstraction may also not have a dedicated write index. * @@ -88,7 +88,14 @@ public interface IndexAbstraction { * An alias typically refers to many concrete indices and * may have a write index. */ - ALIAS("alias"); + ALIAS("alias"), + + /** + * An index abstraction that refers to a data stream. + * A data stream typically has multiple backing indices, the latest of which + * is the target for index requests. + */ + DATA_STREAM("data_stream"); private final String displayName; @@ -182,7 +189,7 @@ public interface IndexAbstraction { /** * Returns the unique alias metadata per concrete index. - * + *

* (note that although alias can point to the same concrete indices, each alias reference may have its own routing * and filters) */ @@ -234,7 +241,7 @@ public interface IndexAbstraction { // Validate hidden status final Map> groupedByHiddenStatus = referenceIndexMetadatas.stream() - .collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden()))); + .collect(Collectors.groupingBy(idxMeta -> Boolean.TRUE.equals(idxMeta.getAliases().get(aliasName).isHidden()))); if (isNonEmpty(groupedByHiddenStatus.get(true)) && isNonEmpty(groupedByHiddenStatus.get(false))) { List hiddenOn = groupedByHiddenStatus.get(true).stream() .map(idx -> idx.getIndex().getName()).collect(Collectors.toList()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java index caa57e16eea..1267b9d6d1b 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java @@ -79,6 +79,7 @@ import java.util.SortedMap; import java.util.TreeMap; import java.util.function.Function; import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.StreamSupport; import static org.elasticsearch.common.settings.Settings.readSettingsFromStream; @@ -1443,6 +1444,7 @@ public class Metadata implements Iterable, Diffable, To }); } } + aliasAndIndexLookup.values().stream() .filter(aliasOrIndex -> aliasOrIndex.getType() == IndexAbstraction.Type.ALIAS) .forEach(alias -> ((IndexAbstraction.Alias) alias).computeAndValidateAliasProperties()); @@ -1453,15 +1455,28 @@ public class Metadata implements Iterable, Diffable, To DataStreamMetadata dsMetadata = (DataStreamMetadata) customs.get(DataStreamMetadata.TYPE); if (dsMetadata != null) { for (DataStream ds : dsMetadata.dataStreams().values()) { - if (indicesLookup.containsKey(ds.getName())) { + IndexAbstraction existing = indicesLookup.get(ds.getName()); + if (existing != null && existing.getType() != IndexAbstraction.Type.DATA_STREAM) { throw new IllegalStateException("data stream [" + ds.getName() + "] conflicts with existing index or alias"); } - SortedMap map = indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-' - if (map.size() != 0) { - throw new IllegalStateException("data stream [" + ds.getName() + - "] could create backing indices that conflict with " + map.size() + " existing index(s) or alias(s)" + - " including '" + map.firstKey() + "'"); + SortedMap potentialConflicts = + indicesLookup.subMap(ds.getName() + "-", ds.getName() + "."); // '.' is the char after '-' + if (potentialConflicts.size() != 0) { + List indexNames = ds.getIndices().stream().map(Index::getName).collect(Collectors.toList()); + List conflicts = new ArrayList<>(); + for (Map.Entry entry : potentialConflicts.entrySet()) { + if (entry.getValue().getType() != IndexAbstraction.Type.CONCRETE_INDEX || + indexNames.contains(entry.getKey()) == false) { + conflicts.add(entry.getKey()); + } + } + + if (conflicts.size() > 0) { + throw new IllegalStateException("data stream [" + ds.getName() + + "] could create backing indices that conflict with " + conflicts.size() + " existing index(s) or alias(s)" + + " including '" + conflicts.get(0) + "'"); + } } } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java index 7df3439ac77..89528dde3c5 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/CreateDataStreamRequestTests.java @@ -18,19 +18,29 @@ */ package org.elasticsearch.action.admin.indices.datastream; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.admin.indices.create.CreateIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction.Request; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataCreateIndexService; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; import org.elasticsearch.test.AbstractWireSerializingTestCase; import java.util.Collections; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCase { @@ -61,16 +71,20 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCas assertThat(e.validationErrors().get(0), containsString("timestamp field name is missing")); } - public void testCreateDataStream() { + public void testCreateDataStream() throws Exception { + final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService(); final String dataStreamName = "my-data-stream"; ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); - ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(cs, req); + ClusterState newState = CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req); assertThat(newState.metadata().dataStreams().size(), equalTo(1)); assertThat(newState.metadata().dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + assertThat(newState.metadata().index(dataStreamName + "-000001"), notNullValue()); + assertThat(newState.metadata().index(dataStreamName + "-000001").getSettings().get("index.hidden"), equalTo("true")); } - public void testCreateDuplicateDataStream() { + public void testCreateDuplicateDataStream() throws Exception { + final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService(); final String dataStreamName = "my-data-stream"; DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList()); ClusterState cs = ClusterState.builder(new ClusterName("_name")) @@ -78,16 +92,39 @@ public class CreateDataStreamRequestTests extends AbstractWireSerializingTestCas CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> CreateDataStreamAction.TransportAction.createDataStream(cs, req)); + () -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req)); assertThat(e.getMessage(), containsString("data_stream [" + dataStreamName + "] already exists")); } - public void testCreateDataStreamWithInvalidName() { + public void testCreateDataStreamWithInvalidName() throws Exception { + final MetadataCreateIndexService metadataCreateIndexService = getMetadataCreateIndexService(); final String dataStreamName = "_My-da#ta- ,stream-"; ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); CreateDataStreamAction.Request req = new CreateDataStreamAction.Request(dataStreamName); IllegalArgumentException e = expectThrows(IllegalArgumentException.class, - () -> CreateDataStreamAction.TransportAction.createDataStream(cs, req)); + () -> CreateDataStreamAction.TransportAction.createDataStream(metadataCreateIndexService, cs, req)); assertThat(e.getMessage(), containsString("must not contain the following characters")); } + + private static MetadataCreateIndexService getMetadataCreateIndexService() throws Exception { + MetadataCreateIndexService s = mock(MetadataCreateIndexService.class); + when(s.applyCreateIndexRequest(any(ClusterState.class), any(CreateIndexClusterStateUpdateRequest.class), anyBoolean())) + .thenAnswer(mockInvocation -> { + ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0]; + CreateIndexClusterStateUpdateRequest request = (CreateIndexClusterStateUpdateRequest) mockInvocation.getArguments()[1]; + + Metadata.Builder b = Metadata.builder(currentState.metadata()) + .put(IndexMetadata.builder(request.index()) + .settings(Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(request.settings()) + .build()) + .numberOfShards(1) + .numberOfReplicas(1) + .build(), false); + return ClusterState.builder(currentState).metadata(b.build()).build(); + }); + + return s; + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java index 2aa02b8fc49..2f52a11c3a9 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetadataTests.java @@ -43,11 +43,13 @@ import org.elasticsearch.plugins.MapperPlugin; import org.elasticsearch.test.ESTestCase; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; @@ -973,7 +975,7 @@ public class MetadataTests extends ESTestCase { public void testBuilderRejectsDataStreamWithConflictingBackingIndices() { final String dataStreamName = "my-data-stream"; - final String conflictingIndex = dataStreamName + "-00001"; + final String conflictingIndex = dataStreamName + "-000001"; Metadata.Builder b = Metadata.builder() .put(IndexMetadata.builder(conflictingIndex) .settings(settings(Version.CURRENT)) @@ -987,6 +989,29 @@ public class MetadataTests extends ESTestCase { "] could create backing indices that conflict with 1 existing index(s) or alias(s) including '" + conflictingIndex + "'")); } + public void testBuilderForDataStreamWithRandomlyNumberedBackingIndices() { + final String dataStreamName = "my-data-stream"; + final List backingIndices = new ArrayList<>(); + final int numBackingIndices = randomIntBetween(2, 5); + int lastBackingIndexNum = randomIntBetween(9, 50); + Metadata.Builder b = Metadata.builder(); + for (int k = 1; k <= numBackingIndices; k++) { + IndexMetadata im = IndexMetadata.builder(String.format(Locale.ROOT, "%s-%06d", dataStreamName, lastBackingIndexNum)) + .settings(settings(Version.CURRENT)) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + b.put(im, false); + backingIndices.add(im.getIndex()); + lastBackingIndexNum = randomIntBetween(lastBackingIndexNum + 1, lastBackingIndexNum + 50); + } + + b.put(new DataStream(dataStreamName, "ts", backingIndices)); + Metadata metadata = b.build(); + assertThat(metadata.dataStreams().size(), equalTo(1)); + assertThat(metadata.dataStreams().get(dataStreamName).getName(), equalTo(dataStreamName)); + } + public void testSerialization() throws IOException { final Metadata orig = randomMetadata(); final BytesStreamOutput out = new BytesStreamOutput();