Delete backing indices with data stream (#54693) (#54976)

This commit is contained in:
Dan Hermann 2020-04-08 15:18:12 -05:00 committed by GitHub
parent 264bfaca56
commit c7f9a27d2d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 163 additions and 13 deletions

View File

@ -119,3 +119,50 @@
indices.delete_data_stream:
name: get-data-stream2
- is_true: acknowledged
---
"Delete data stream with backing indices":
- skip:
version: " - 7.99.99"
reason: "enable in 7.8+ after back-porting"
- do:
indices.create_data_stream:
name: delete-data-stream1
body:
timestamp_field: "@timestamp"
- is_true: acknowledged
- do:
indices.create:
index: test_index
body:
settings:
number_of_shards: 1
number_of_replicas: 1
- do:
indices.get:
index: "*"
expand_wildcards: all
- is_true: test_index.settings
- is_true: delete-data-stream1-000001.settings
- do:
indices.get_data_streams: {}
- match: { 0.name: delete-data-stream1 }
- match: { 0.timestamp_field: '@timestamp' }
- length: { 0.indices: 1 }
- match: { 0.indices.0.index_name: 'delete-data-stream1-000001' }
- do:
indices.delete_data_stream:
name: delete-data-stream1
- is_true: acknowledged
- do:
catch: missing
indices.get:
index: "delete-data-stream1-000001"
expand_wildcards: all

View File

@ -33,8 +33,10 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
@ -43,11 +45,14 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.Index;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@ -106,10 +111,14 @@ public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {
public static class TransportAction extends TransportMasterNodeAction<Request, AcknowledgedResponse> {
private final MetadataDeleteIndexService deleteIndexService;
@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataDeleteIndexService deleteIndexService) {
super(NAME, transportService, clusterService, threadPool, actionFilters, Request::new, indexNameExpressionResolver);
this.deleteIndexService = deleteIndexService;
}
@Override
@ -139,7 +148,7 @@ public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {
@Override
public ClusterState execute(ClusterState currentState) {
return removeDataStream(currentState, request);
return removeDataStream(deleteIndexService, currentState, request);
}
@Override
@ -149,7 +158,7 @@ public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {
});
}
static ClusterState removeDataStream(ClusterState currentState, Request request) {
static ClusterState removeDataStream(MetadataDeleteIndexService deleteIndexService, ClusterState currentState, Request request) {
Set<String> dataStreams = new HashSet<>();
for (String dataStreamName : currentState.metadata().dataStreams().keySet()) {
if (Regex.simpleMatch(request.name, dataStreamName)) {
@ -164,10 +173,19 @@ public class DeleteDataStreamAction extends ActionType<AcknowledgedResponse> {
}
throw new ResourceNotFoundException("data_streams matching [" + request.name + "] not found");
}
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
List<String> dataStreamsToRemove = new ArrayList<>();
Set<Index> backingIndicesToRemove = new HashSet<>();
for (String dataStreamName : dataStreams) {
logger.info("removing data stream [{}]", dataStreamName);
metadata.removeDataStream(dataStreamName);
DataStream dataStream = currentState.metadata().dataStreams().get(dataStreamName);
assert dataStream != null;
backingIndicesToRemove.addAll(dataStream.getIndices());
dataStreamsToRemove.add(dataStreamName);
}
currentState = deleteIndexService.deleteIndices(currentState, backingIndicesToRemove);
Metadata.Builder metadata = Metadata.builder(currentState.metadata());
for (String ds : dataStreamsToRemove) {
logger.info("removing data stream [{}]", ds);
metadata.removeDataStream(ds);
}
return ClusterState.builder(currentState).metadata(metadata).build();
}

View File

@ -19,19 +19,32 @@
package org.elasticsearch.action.admin.indices.datastream;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction.Request;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.DataStream;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.metadata.MetadataDeleteIndexService;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import java.util.Collections;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Set;
import java.util.stream.Collectors;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCase<Request> {
@ -61,13 +74,21 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
public void testDeleteDataStream() {
final String dataStreamName = "my-data-stream";
DataStream existingDataStream = new DataStream(dataStreamName, "timestamp", Collections.emptyList());
ClusterState cs = ClusterState.builder(new ClusterName("_name"))
.metadata(Metadata.builder().dataStreams(Collections.singletonMap(dataStreamName, existingDataStream)).build()).build();
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
final List<String> otherIndices = randomSubsetOf(org.elasticsearch.common.collect.List.of("foo", "bar", "baz"));
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(cs, req);
ClusterState cs = getClusterState(
org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, org.elasticsearch.common.collect.List.of(
String.format(Locale.ROOT, "%s-%06d", dataStreamName, 1),
String.format(Locale.ROOT, "%s-%06d", dataStreamName, 2)))),
otherIndices);
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req);
assertThat(newState.metadata().dataStreams().size(), equalTo(0));
assertThat(newState.metadata().indices().size(), equalTo(otherIndices.size()));
for (String indexName : otherIndices) {
assertThat(newState.metadata().indices().get(indexName).getIndex().getName(), equalTo(indexName));
}
}
public void testDeleteNonexistentDataStream() {
@ -75,7 +96,71 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
ClusterState cs = ClusterState.builder(new ClusterName("_name")).build();
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
ResourceNotFoundException e = expectThrows(ResourceNotFoundException.class,
() -> DeleteDataStreamAction.TransportAction.removeDataStream(cs, req));
() -> DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req));
assertThat(e.getMessage(), containsString("data_streams matching [" + dataStreamName + "] not found"));
}
@SuppressWarnings("unchecked")
private static MetadataDeleteIndexService getMetadataDeleteIndexService() {
MetadataDeleteIndexService s = mock(MetadataDeleteIndexService.class);
when(s.deleteIndices(any(ClusterState.class), any(Set.class)))
.thenAnswer(mockInvocation -> {
ClusterState currentState = (ClusterState) mockInvocation.getArguments()[0];
Set<Index> indices = (Set<Index>) mockInvocation.getArguments()[1];
final Metadata.Builder b = Metadata.builder(currentState.metadata());
for (Index index : indices) {
b.remove(index.getName());
}
return ClusterState.builder(currentState).metadata(b.build()).build();
});
return s;
}
/**
* Constructs {@code ClusterState} with the specified data streams and indices.
*
* @param dataStreamAndIndexNames The names of the data streams to create with their respective backing indices
* @param indexNames The names of indices to create that do not back any data streams
*/
private static ClusterState getClusterState(List<Tuple<String, List<String>>> dataStreamAndIndexNames, List<String> indexNames) {
Metadata.Builder builder = Metadata.builder();
List<IndexMetadata> allIndices = new ArrayList<>();
for (Tuple<String, List<String>> dsTuple : dataStreamAndIndexNames) {
List<IndexMetadata> backingIndices = new ArrayList<>();
for (String indexName : dsTuple.v2()) {
backingIndices.add(createIndexMetadata(indexName, true));
}
allIndices.addAll(backingIndices);
DataStream ds = new DataStream(dsTuple.v1(), "@timestamp",
backingIndices.stream().map(IndexMetadata::getIndex).collect(Collectors.toList()));
builder.put(ds);
}
for (String indexName : indexNames) {
allIndices.add(createIndexMetadata(indexName, false));
}
for (IndexMetadata index : allIndices) {
builder.put(index, false);
}
return ClusterState.builder(new ClusterName("_name")).metadata(builder).build();
}
private static IndexMetadata createIndexMetadata(String name, boolean hidden) {
Settings.Builder b = Settings.builder()
.put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT)
.put("index.hidden", hidden);
return IndexMetadata.builder(name)
.settings(b)
.numberOfShards(1)
.numberOfReplicas(1)
.build();
}
}