From c7f9a27d2dcbf234e15bac22d6df89d6aaba0cba Mon Sep 17 00:00:00 2001 From: Dan Hermann Date: Wed, 8 Apr 2020 15:18:12 -0500 Subject: [PATCH] Delete backing indices with data stream (#54693) (#54976) --- .../test/indices.data_stream/10_basic.yml | 47 +++++++++ .../datastream/DeleteDataStreamAction.java | 30 ++++-- .../DeleteDataStreamRequestTests.java | 99 +++++++++++++++++-- 3 files changed, 163 insertions(+), 13 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml index 00aa3f7efd7..341535a2495 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.data_stream/10_basic.yml @@ -119,3 +119,50 @@ indices.delete_data_stream: name: get-data-stream2 - is_true: acknowledged + +--- +"Delete data stream with backing indices": + - skip: + version: " - 7.99.99" + reason: "enable in 7.8+ after back-porting" + + - do: + indices.create_data_stream: + name: delete-data-stream1 + body: + timestamp_field: "@timestamp" + - is_true: acknowledged + + - do: + indices.create: + index: test_index + body: + settings: + number_of_shards: 1 + number_of_replicas: 1 + + - do: + indices.get: + index: "*" + expand_wildcards: all + + - is_true: test_index.settings + - is_true: delete-data-stream1-000001.settings + + - do: + indices.get_data_streams: {} + - match: { 0.name: delete-data-stream1 } + - match: { 0.timestamp_field: '@timestamp' } + - length: { 0.indices: 1 } + - match: { 0.indices.0.index_name: 'delete-data-stream1-000001' } + + - do: + indices.delete_data_stream: + name: delete-data-stream1 + - is_true: acknowledged + + - do: + catch: missing + indices.get: + index: "delete-data-stream1-000001" + expand_wildcards: all diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java index fb29a8c2d84..6ee027639b1 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamAction.java @@ -33,8 +33,10 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateUpdateTask; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.DataStream; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; import org.elasticsearch.common.Strings; @@ -43,11 +45,14 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.regex.Regex; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.Index; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; import java.io.IOException; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Objects; import java.util.Set; @@ -106,10 +111,14 @@ public class DeleteDataStreamAction extends ActionType { public static class TransportAction extends TransportMasterNodeAction { + private final MetadataDeleteIndexService deleteIndexService; + @Inject public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool, - ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) { + ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + MetadataDeleteIndexService deleteIndexService) { super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver); + this.deleteIndexService = deleteIndexService; } @Override @@ -139,7 +148,7 @@ public class DeleteDataStreamAction extends ActionType { @Override public ClusterState execute(ClusterState currentState) { - return removeDataStream(currentState, request); + return removeDataStream(deleteIndexService, currentState, request); } @Override @@ -149,7 +158,7 @@ public class DeleteDataStreamAction extends ActionType { }); } - static ClusterState removeDataStream(ClusterState currentState, Request request) { + static ClusterState removeDataStream(MetadataDeleteIndexService deleteIndexService, ClusterState currentState, Request request) { Set dataStreams = new HashSet<>(); for (String dataStreamName : currentState.metadata().dataStreams().keySet()) { if (Regex.simpleMatch(request.name, dataStreamName)) { @@ -164,10 +173,19 @@ public class DeleteDataStreamAction extends ActionType { } throw new ResourceNotFoundException("data_streams matching [" + request.name + "] not found"); } - Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + List dataStreamsToRemove = new ArrayList<>(); + Set backingIndicesToRemove = new HashSet<>(); for (String dataStreamName : dataStreams) { - logger.info("removing data stream [{}]", dataStreamName); - metadata.removeDataStream(dataStreamName); + DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName); + assert dataStream != null; + backingIndicesToRemove.addAll(dataStream.getIndices()); + dataStreamsToRemove.add(dataStreamName); + } + currentState = deleteIndexService.deleteIndices(currentState, backingIndicesToRemove); + Metadata.Builder metadata = Metadata.builder(currentState.metadata()); + for (String ds : dataStreamsToRemove) { + logger.info("removing data stream [{}]", ds); + metadata.removeDataStream(ds); } return ClusterState.builder(currentState).metadata(metadata).build(); } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java index db84d910912..1c5255d5f7b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/datastream/DeleteDataStreamRequestTests.java @@ -19,19 +19,32 @@ package org.elasticsearch.action.admin.indices.datastream; import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.DataStream; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; +import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService; +import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.Index; import org.elasticsearch.test.AbstractWireSerializingTestCase; -import java.util.Collections; +import java.util.ArrayList; +import java.util.List; +import java.util.Locale; +import java.util.Set; +import java.util.stream.Collectors; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase { @@ -61,13 +74,21 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas public void testDeleteDataStream() { final String dataStreamName = "my-data-stream"; - DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList()); - ClusterState cs = ClusterState.builder(new ClusterName("_name")) - .metadata(Metadata.builder().dataStreams(Collections.singletonMap(dataStreamName, existingDataStream)).build()).build(); - DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName); + final List otherIndices = randomSubsetOf(org.elasticsearch.common.collect.List.of("foo", "bar", "baz")); - ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(cs, req); + ClusterState cs = getClusterState( + org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, org.elasticsearch.common.collect.List.of( + String.format(Locale.ROOT, "%s-%06d", dataStreamName, 1), + String.format(Locale.ROOT, "%s-%06d", dataStreamName, 2)))), + otherIndices); + + DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName); + ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req); assertThat(newState.metadata().dataStreams().size(), equalTo(0)); + assertThat(newState.metadata().indices().size(), equalTo(otherIndices.size())); + for (String indexName : otherIndices) { + assertThat(newState.metadata().indices().get(indexName).getIndex().getName(), equalTo(indexName)); + } } public void testDeleteNonexistentDataStream() { @@ -75,7 +96,71 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas ClusterState cs = ClusterState.builder(new ClusterName("_name")).build(); DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName); ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class, - () -> DeleteDataStreamAction.TransportAction.removeDataStream(cs, req)); + () -> DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req)); assertThat(e.getMessage(), containsString("data_streams matching [" + dataStreamName + "] not found")); } + + @SuppressWarnings("unchecked") + private static MetadataDeleteIndexService getMetadataDeleteIndexService() { + MetadataDeleteIndexService s = mock(MetadataDeleteIndexService.class); + when(s.deleteIndices(any(ClusterState.class), any(Set.class))) + .thenAnswer(mockInvocation -> { + ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0]; + Set indices = (Set) mockInvocation.getArguments()[1]; + + final Metadata.Builder b = Metadata.builder(currentState.metadata()); + for (Index index : indices) { + b.remove(index.getName()); + } + + return ClusterState.builder(currentState).metadata(b.build()).build(); + }); + + return s; + } + + /** + * Constructs {@code ClusterState} with the specified data streams and indices. + * + * @param dataStreamAndIndexNames The names of the data streams to create with their respective backing indices + * @param indexNames The names of indices to create that do not back any data streams + */ + private static ClusterState getClusterState(List>> dataStreamAndIndexNames, List indexNames) { + Metadata.Builder builder = Metadata.builder(); + + List allIndices = new ArrayList<>(); + for (Tuple> dsTuple : dataStreamAndIndexNames) { + List backingIndices = new ArrayList<>(); + for (String indexName : dsTuple.v2()) { + backingIndices.add(createIndexMetadata(indexName, true)); + } + allIndices.addAll(backingIndices); + + DataStream ds = new DataStream(dsTuple.v1(), "@timestamp", + backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList())); + builder.put(ds); + } + + for (String indexName : indexNames) { + allIndices.add(createIndexMetadata(indexName, false)); + } + + for (IndexMetadata index : allIndices) { + builder.put(index, false); + } + + return ClusterState.builder(new ClusterName("_name")).metadata(builder).build(); + } + + private static IndexMetadata createIndexMetadata(String name, boolean hidden) { + Settings.Builder b = Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put("index.hidden", hidden); + + return IndexMetadata.builder(name) + .settings(b) + .numberOfShards(1) + .numberOfReplicas(1) + .build(); + } }