[7.x] Prohibit closing the write index for a data stream (#57740)

This commit is contained in:
Dan Hermann 2020-06-05 11:14:43 -05:00 committed by GitHub
parent bc921ea17c
commit 3fe93e24a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 84 additions and 0 deletions

View File

@ -119,3 +119,34 @@
- match: { indices.index_2.closed: true }
- match: { indices.index_3.closed: true }
---
"Close write index for data stream fails":
- skip:
version: " - 7.99.99"
reason: "data streams not yet released"
features: allowed_warnings
- do:
allowed_warnings:
- "index template [my-template1] has index patterns [simple-data-stream1] matching patterns from existing older templates [global] with patterns (global => [*]); this template [my-template1] will take precedence during new index creation"
indices.put_index_template:
name: my-template1
body:
index_patterns: [simple-data-stream1]
data_stream:
timestamp_field: '@timestamp'
- do:
indices.create_data_stream:
name: simple-data-stream1
- is_true: acknowledged
- do:
catch: bad_request
indices.close:
index: "simple-data-stream1-000001"
- do:
indices.delete_data_stream:
name: simple-data-stream1
- is_true: acknowledged

View File

@ -83,6 +83,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@ -136,6 +137,18 @@ public class MetadataIndexStateService {
if (concreteIndices == null || concreteIndices.length == 0) {
throw new IllegalArgumentException("Index name is required");
}
List<String> writeIndices = new ArrayList<>();
SortedMap<String, IndexAbstraction> lookup = clusterService.state().metadata().getIndicesLookup();
for (Index index : concreteIndices) {
IndexAbstraction ia = lookup.get(index.getName());
if (ia != null && ia.getParentDataStream() != null && ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
writeIndices.add(index.getName());
}
}
if (writeIndices.size() > 0) {
throw new IllegalArgumentException("cannot close the following data stream write indices [" +
Strings.collectionToCommaDelimitedString(writeIndices) + "]");
}
clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT) {

View File

@ -20,8 +20,10 @@
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResult;
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamRequestTests;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
@ -37,8 +39,11 @@ import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.UnassignedInfo;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
@ -50,11 +55,15 @@ import org.elasticsearch.snapshots.SnapshotInProgressException;
import org.elasticsearch.snapshots.SnapshotInfoTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;
import org.hamcrest.CoreMatchers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
@ -75,6 +84,8 @@ import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MetadataIndexStateServiceTests extends ESTestCase {
@ -380,6 +391,35 @@ public class MetadataIndexStateServiceTests extends ESTestCase {
assertThat(failedIndices, equalTo(disappearedIndices));
}
public void testCloseCurrentWriteIndexForDataStream() {
int numDataStreams = randomIntBetween(1, 3);
List<Tuple<String, Integer>> dataStreamsToCreate = new ArrayList<>();
List<String> writeIndices = new ArrayList<>();
for (int k = 0; k < numDataStreams; k++) {
String dataStreamName = randomAlphaOfLength(6).toLowerCase(Locale.ROOT);
int numBackingIndices = randomIntBetween(1, 5);
dataStreamsToCreate.add(new Tuple<>(dataStreamName, numBackingIndices));
writeIndices.add(DataStream.getBackingIndexName(dataStreamName, numBackingIndices));
}
ClusterState cs = DeleteDataStreamRequestTests.getClusterStateWithDataStreams(dataStreamsToCreate,
org.elasticsearch.common.collect.List.of());
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(cs);
List<String> indicesToDelete = randomSubsetOf(randomIntBetween(1, numDataStreams), writeIndices);
Index[] indicesToDeleteArray = new Index[indicesToDelete.size()];
for (int k = 0; k < indicesToDelete.size(); k++) {
Index indexToDelete = cs.metadata().index(indicesToDelete.get(k)).getIndex();
indicesToDeleteArray[k] = indexToDelete;
}
MetadataIndexStateService service = new MetadataIndexStateService(clusterService, null, null, null, null, null, null);
CloseIndexClusterStateUpdateRequest request = new CloseIndexClusterStateUpdateRequest(0L).indices(indicesToDeleteArray);
Exception e = expectThrows(IllegalArgumentException.class, () -> service.closeIndices(request, null));
assertThat(e.getMessage(), CoreMatchers.containsString("cannot close the following data stream write indices [" +
Strings.collectionToCommaDelimitedString(indicesToDelete) + "]"));
}
public static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
return addIndex(state, index, numShards, numReplicas, IndexMetadata.State.OPEN, null);
}