[7.x] Delete index API properly handles backing indices for data streams (#55971)
This commit is contained in:
parent
07a3d808ee
commit
bf89e485fc
|
@ -0,0 +1,99 @@
|
|||
---
|
||||
"Delete backing index on data stream":
|
||||
- skip:
|
||||
version: " - 7.99.99"
|
||||
reason: "enable in 7.8+ after backporting"
|
||||
|
||||
- do:
|
||||
indices.create_data_stream:
|
||||
name: simple-data-stream
|
||||
body:
|
||||
timestamp_field: "@timestamp"
|
||||
- is_true: acknowledged
|
||||
|
||||
# rollover data stream to create new backing index
|
||||
- do:
|
||||
indices.rollover:
|
||||
alias: "simple-data-stream"
|
||||
|
||||
- match: { old_index: simple-data-stream-000001 }
|
||||
- match: { new_index: simple-data-stream-000002 }
|
||||
- match: { rolled_over: true }
|
||||
- match: { dry_run: false }
|
||||
|
||||
# ensure new index is created
|
||||
- do:
|
||||
indices.exists:
|
||||
index: simple-data-stream-000002
|
||||
|
||||
- is_true: ''
|
||||
|
||||
- do:
|
||||
indices.delete:
|
||||
index: simple-data-stream-000001
|
||||
|
||||
- do:
|
||||
indices.exists:
|
||||
index: simple-data-stream-000001
|
||||
|
||||
- is_false: ''
|
||||
|
||||
- do:
|
||||
indices.get_data_streams:
|
||||
name: "*"
|
||||
- match: { 0.name: simple-data-stream }
|
||||
- match: { 0.timestamp_field: '@timestamp' }
|
||||
- match: { 0.generation: 2 }
|
||||
- length: { 0.indices: 1 }
|
||||
- match: { 0.indices.0.index_name: 'simple-data-stream-000002' }
|
||||
|
||||
- do:
|
||||
indices.delete_data_stream:
|
||||
name: simple-data-stream
|
||||
- is_true: acknowledged
|
||||
|
||||
---
|
||||
"Attempt to delete write index on data stream is rejected":
|
||||
- skip:
|
||||
version: " - 7.99.99"
|
||||
reason: "enable in 7.8+ after backporting"
|
||||
|
||||
- do:
|
||||
indices.create_data_stream:
|
||||
name: simple-data-stream
|
||||
body:
|
||||
timestamp_field: "@timestamp"
|
||||
- is_true: acknowledged
|
||||
|
||||
# rollover data stream to create new backing index
|
||||
- do:
|
||||
indices.rollover:
|
||||
alias: "simple-data-stream"
|
||||
|
||||
- match: { old_index: simple-data-stream-000001 }
|
||||
- match: { new_index: simple-data-stream-000002 }
|
||||
- match: { rolled_over: true }
|
||||
- match: { dry_run: false }
|
||||
|
||||
# ensure new index is created
|
||||
- do:
|
||||
indices.exists:
|
||||
index: simple-data-stream-000002
|
||||
|
||||
- is_true: ''
|
||||
|
||||
- do:
|
||||
catch: bad_request
|
||||
indices.delete:
|
||||
index: simple-data-stream-000002
|
||||
|
||||
- do:
|
||||
indices.exists:
|
||||
index: simple-data-stream-000002
|
||||
|
||||
- is_true: ''
|
||||
|
||||
- do:
|
||||
indices.delete_data_stream:
|
||||
name: simple-data-stream
|
||||
- is_true: acknowledged
|
|
@ -47,6 +47,8 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
|
|||
this.timeStampField = timeStampField;
|
||||
this.indices = indices;
|
||||
this.generation = generation;
|
||||
assert indices.size() > 0;
|
||||
assert indices.get(indices.size() - 1).getName().equals(getBackingIndexName(name, generation));
|
||||
}
|
||||
|
||||
public DataStream(String name, String timeStampField, List<Index> indices) {
|
||||
|
@ -84,6 +86,19 @@ public final class DataStream extends AbstractDiffable<DataStream> implements To
|
|||
return new DataStream(name, timeStampField, backingIndices, generation + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes the specified backing index and returns a new {@code DataStream} instance with
|
||||
* the remaining backing indices.
|
||||
*
|
||||
* @param index the backing index to remove
|
||||
* @return new {@code DataStream} instance with the remaining backing indices
|
||||
*/
|
||||
public DataStream removeBackingIndex(Index index) {
|
||||
List<Index> backingIndices = new ArrayList<>(indices);
|
||||
backingIndices.remove(index);
|
||||
return new DataStream(name, timeStampField, backingIndices, generation);
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates the name of the index that conforms to the naming convention for backing indices
|
||||
* on data streams given the specified data stream name and generation.
|
||||
|
|
|
@ -42,10 +42,11 @@ import org.elasticsearch.snapshots.SnapshotInProgressException;
|
|||
import org.elasticsearch.snapshots.SnapshotsService;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import static java.util.stream.Collectors.toSet;
|
||||
|
||||
/**
|
||||
* Deletes indices.
|
||||
*/
|
||||
|
@ -91,7 +92,21 @@ public class MetadataDeleteIndexService {
|
|||
*/
|
||||
public ClusterState deleteIndices(ClusterState currentState, Set<Index> indices) {
|
||||
final Metadata meta = currentState.metadata();
|
||||
final Set<Index> indicesToDelete = indices.stream().map(i -> meta.getIndexSafe(i).getIndex()).collect(toSet());
|
||||
final Set<Index> indicesToDelete = new HashSet<>();
|
||||
final Map<Index, DataStream> backingIndices = new HashMap<>();
|
||||
for (Index index : indices) {
|
||||
IndexMetadata im = meta.getIndexSafe(index);
|
||||
IndexAbstraction.DataStream parent = meta.getIndicesLookup().get(im.getIndex().getName()).getParentDataStream();
|
||||
if (parent != null) {
|
||||
if (parent.getWriteIndex().equals(im)) {
|
||||
throw new IllegalArgumentException("index [" + index.getName() + "] is the write index for data stream [" +
|
||||
parent.getName() + "] and cannot be deleted");
|
||||
} else {
|
||||
backingIndices.put(index, parent.getDataStream());
|
||||
}
|
||||
}
|
||||
indicesToDelete.add(im.getIndex());
|
||||
}
|
||||
|
||||
// Check if index deletion conflicts with any running snapshots
|
||||
Set<Index> snapshottingIndices = SnapshotsService.snapshottingIndices(currentState, indicesToDelete);
|
||||
|
@ -112,6 +127,10 @@ public class MetadataDeleteIndexService {
|
|||
routingTableBuilder.remove(indexName);
|
||||
clusterBlocksBuilder.removeIndexBlocks(indexName);
|
||||
metadataBuilder.remove(indexName);
|
||||
if (backingIndices.containsKey(index)) {
|
||||
DataStream parent = backingIndices.get(index);
|
||||
metadataBuilder.put(parent.removeBackingIndex(index));
|
||||
}
|
||||
}
|
||||
// add tombstones to the cluster state for each deleted index
|
||||
final IndexGraveyard currentGraveyard = graveyardBuilder.addTombstones(indices).build(settings);
|
||||
|
|
|
@ -74,7 +74,8 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
|
|||
public void testDeleteDataStream() {
|
||||
final String dataStreamName = "my-data-stream";
|
||||
final List<String> otherIndices = randomSubsetOf(org.elasticsearch.common.collect.List.of("foo", "bar", "baz"));
|
||||
ClusterState cs = getClusterState(org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, 2)), otherIndices);
|
||||
ClusterState cs = getClusterStateWithDataStreams(
|
||||
org.elasticsearch.common.collect.List.of(new Tuple<>(dataStreamName, 2)), otherIndices);
|
||||
DeleteDataStreamAction.Request req = new DeleteDataStreamAction.Request(dataStreamName);
|
||||
ClusterState newState = DeleteDataStreamAction.TransportAction.removeDataStream(getMetadataDeleteIndexService(), cs, req);
|
||||
assertThat(newState.metadata().dataStreams().size(), equalTo(0));
|
||||
|
@ -118,7 +119,7 @@ public class DeleteDataStreamRequestTests extends AbstractWireSerializingTestCas
|
|||
* @param dataStreams The names of the data streams to create with their respective number of backing indices
|
||||
* @param indexNames The names of indices to create that do not back any data streams
|
||||
*/
|
||||
private static ClusterState getClusterState(List<Tuple<String, Integer>> dataStreams, List<String> indexNames) {
|
||||
public static ClusterState getClusterStateWithDataStreams(List<Tuple<String, Integer>> dataStreams, List<String> indexNames) {
|
||||
Metadata.Builder builder = Metadata.builder();
|
||||
|
||||
List<IndexMetadata> allIndices = new ArrayList<>();
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.elasticsearch.action.admin.indices.datastream;
|
|||
|
||||
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamsAction.Response;
|
||||
import org.elasticsearch.cluster.metadata.DataStream;
|
||||
import org.elasticsearch.cluster.metadata.DataStreamTests;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.xcontent.XContentParser;
|
||||
import org.elasticsearch.common.xcontent.XContentParser.Token;
|
||||
|
@ -29,8 +30,6 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.cluster.metadata.DataStreamTests.randomIndexInstances;
|
||||
|
||||
public class GetDataStreamsResponseTests extends AbstractSerializingTestCase<Response> {
|
||||
|
||||
@Override
|
||||
|
@ -54,7 +53,7 @@ public class GetDataStreamsResponseTests extends AbstractSerializingTestCase<Res
|
|||
int numDataStreams = randomIntBetween(0, 8);
|
||||
List<DataStream> dataStreams = new ArrayList<>();
|
||||
for (int i = 0; i < numDataStreams; i++) {
|
||||
dataStreams.add(new DataStream(randomAlphaOfLength(4), randomAlphaOfLength(4), randomIndexInstances()));
|
||||
dataStreams.add(DataStreamTests.randomInstance());
|
||||
}
|
||||
return new Response(dataStreams);
|
||||
}
|
||||
|
|
|
@ -78,4 +78,24 @@ public class DataStreamTests extends AbstractSerializingTestCase<DataStream> {
|
|||
assertTrue(rolledDs.getIndices().containsAll(ds.getIndices()));
|
||||
assertTrue(rolledDs.getIndices().contains(newWriteIndex));
|
||||
}
|
||||
|
||||
public void testRemoveBackingIndex() {
|
||||
int numBackingIndices = randomIntBetween(2, 32);
|
||||
int indexToRemove = randomIntBetween(1, numBackingIndices - 1);
|
||||
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
|
||||
List<Index> indices = new ArrayList<>(numBackingIndices);
|
||||
for (int k = 1; k <= numBackingIndices; k++) {
|
||||
indices.add(new Index(DataStream.getBackingIndexName(dataStreamName, k), UUIDs.randomBase64UUID(random())));
|
||||
}
|
||||
DataStream original = new DataStream(dataStreamName, "@timestamp", indices);
|
||||
DataStream updated = original.removeBackingIndex(indices.get(indexToRemove - 1));
|
||||
assertThat(updated.getName(), equalTo(original.getName()));
|
||||
assertThat(updated.getGeneration(), equalTo(original.getGeneration()));
|
||||
assertThat(updated.getTimeStampField(), equalTo(original.getTimeStampField()));
|
||||
assertThat(updated.getIndices().size(), equalTo(numBackingIndices - 1));
|
||||
for (int k = 0; k < (numBackingIndices - 1); k++) {
|
||||
assertThat(updated.getIndices().get(k), equalTo(original.getIndices().get(k < (indexToRemove - 1) ? k : k + 1)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
package org.elasticsearch.cluster.metadata;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests;
|
||||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.SnapshotsInProgress;
|
||||
|
@ -26,6 +27,9 @@ import org.elasticsearch.cluster.block.ClusterBlocks;
|
|||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.collect.List;
|
||||
import org.elasticsearch.common.collect.Set;
|
||||
import org.elasticsearch.common.collect.Tuple;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
|
@ -36,9 +40,15 @@ import org.elasticsearch.snapshots.SnapshotInProgressException;
|
|||
import org.elasticsearch.snapshots.SnapshotInfoTests;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
import org.elasticsearch.test.VersionUtils;
|
||||
import org.hamcrest.core.IsNull;
|
||||
import org.junit.Before;
|
||||
|
||||
import java.util.Locale;
|
||||
|
||||
import static java.util.Collections.singleton;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.hamcrest.CoreMatchers.containsString;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
|
@ -46,8 +56,18 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
|
||||
public class MetadataDeleteIndexServiceTests extends ESTestCase {
|
||||
private final AllocationService allocationService = mock(AllocationService.class);
|
||||
private final MetadataDeleteIndexService service = new MetadataDeleteIndexService(Settings.EMPTY, null, allocationService);
|
||||
private AllocationService allocationService;
|
||||
private MetadataDeleteIndexService service;
|
||||
|
||||
@Override
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
allocationService = mock(AllocationService.class);
|
||||
when(allocationService.reroute(any(ClusterState.class), any(String.class)))
|
||||
.thenAnswer(mockInvocation -> mockInvocation.getArguments()[0]);
|
||||
service = new MetadataDeleteIndexService(Settings.EMPTY, null, allocationService);
|
||||
}
|
||||
|
||||
public void testDeleteMissing() {
|
||||
Index index = new Index("missing", "doesn't matter");
|
||||
|
@ -92,6 +112,35 @@ public class MetadataDeleteIndexServiceTests extends ESTestCase {
|
|||
verify(allocationService).reroute(any(ClusterState.class), any(String.class));
|
||||
}
|
||||
|
||||
public void testDeleteBackingIndexForDataStream() {
|
||||
int numBackingIndices = randomIntBetween(2, 5);
|
||||
String dataStreamName = randomAlphaOfLength(6).toLowerCase(Locale.ROOT);
|
||||
ClusterState before = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(
|
||||
List.of(new Tuple<>(dataStreamName, numBackingIndices)), List.of());
|
||||
|
||||
int numIndexToDelete = randomIntBetween(1, numBackingIndices - 1);
|
||||
|
||||
Index indexToDelete = before.metadata().index(DataStream.getBackingIndexName(dataStreamName, numIndexToDelete)).getIndex();
|
||||
ClusterState after = service.deleteIndices(before, Set.of(indexToDelete));
|
||||
|
||||
assertThat(after.metadata().getIndices().get(indexToDelete.getName()), IsNull.nullValue());
|
||||
assertThat(after.metadata().getIndices().size(), equalTo(numBackingIndices - 1));
|
||||
assertThat(after.metadata().getIndices().get(DataStream.getBackingIndexName(dataStreamName, numIndexToDelete)), IsNull.nullValue());
|
||||
}
|
||||
|
||||
public void testDeleteCurrentWriteIndexForDataStream() {
|
||||
int numBackingIndices = randomIntBetween(1, 5);
|
||||
String dataStreamName = randomAlphaOfLength(6).toLowerCase(Locale.ROOT);
|
||||
ClusterState before = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(
|
||||
List.of(new Tuple<>(dataStreamName, numBackingIndices)), List.of());
|
||||
|
||||
Index indexToDelete = before.metadata().index(DataStream.getBackingIndexName(dataStreamName, numBackingIndices)).getIndex();
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> service.deleteIndices(before, Set.of(indexToDelete)));
|
||||
|
||||
assertThat(e.getMessage(), containsString("index [" + indexToDelete.getName() + "] is the write index for data stream [" +
|
||||
dataStreamName + "] and cannot be deleted"));
|
||||
}
|
||||
|
||||
private ClusterState clusterState(String index) {
|
||||
IndexMetadata indexMetadata = IndexMetadata.builder(index)
|
||||
.settings(Settings.builder().put("index.version.created", VersionUtils.randomVersion(random())))
|
||||
|
|
Loading…
Reference in New Issue