From e149b0852e206574faad54ab08f8a61827f73e8d Mon Sep 17 00:00:00 2001 From: Tanguy Leroux Date: Mon, 7 Jan 2019 16:44:59 +0100 Subject: [PATCH] [Close Index API] Add unique UUID to ClusterBlock (#36775) This commit adds a unique id to cluster blocks, so that they can be uniquely identified if needed. This is important for the Close Index API where multiple concurrent closing requests can be executed at the same time. By adding a UUID to the cluster block, we can generate unique "closing block" that can later be verified on shards and then checked again from the cluster state before closing the index. When the verification on shard is done, the closing block is replaced by the regular INDEX_CLOSED_BLOCK instance. If something goes wrong, calling the Open Index API will remove the block. Related to #33888 --- .../test/rest/WaitForRefreshAndCloseIT.java | 2 - ...TransportVerifyShardBeforeCloseAction.java | 39 ++- .../cluster/block/ClusterBlock.java | 61 +++-- .../cluster/block/ClusterBlocks.java | 38 +++ .../metadata/MetaDataIndexStateService.java | 251 +++++++++++------- ...portVerifyShardBeforeCloseActionTests.java | 16 +- .../cluster/block/ClusterBlockTests.java | 161 ++++++++--- .../MetaDataIndexStateServiceTests.java | 133 ++++++---- .../MetaDataIndexStateServiceUtils.java | 17 +- .../indices/cluster/ClusterStateChanges.java | 10 +- .../indices/state/CloseIndexIT.java | 93 ++++++- .../indices/state/OpenCloseIndexIT.java | 21 +- .../indices/state/ReopenWhileClosingIT.java | 167 ++++++++++++ .../DedicatedClusterSnapshotRestoreIT.java | 2 +- .../SharedClusterSnapshotRestoreIT.java | 4 +- .../hamcrest/ElasticsearchAssertions.java | 22 +- 16 files changed, 780 insertions(+), 257 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java diff --git a/distribution/archives/integ-test-zip/src/test/java/org/elasticsearch/test/rest/WaitForRefreshAndCloseIT.java b/distribution/archives/integ-test-zip/src/test/java/org/elasticsearch/test/rest/WaitForRefreshAndCloseIT.java index 2292def6d4a..52b1a8c52b5 100644 --- a/distribution/archives/integ-test-zip/src/test/java/org/elasticsearch/test/rest/WaitForRefreshAndCloseIT.java +++ b/distribution/archives/integ-test-zip/src/test/java/org/elasticsearch/test/rest/WaitForRefreshAndCloseIT.java @@ -20,7 +20,6 @@ package org.elasticsearch.test.rest; import org.apache.http.util.EntityUtils; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.client.Request; @@ -41,7 +40,6 @@ import static org.hamcrest.Matchers.instanceOf; /** * Tests that wait for refresh is fired if the index is closed. */ -@LuceneTestCase.AwaitsFix(bugUrl = "to be created") public class WaitForRefreshAndCloseIT extends ESRestTestCase { @Before public void setupIndex() throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index f603f92a718..f08f6ea7dff 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -29,9 +29,10 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; -import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.shard.IndexShard; @@ -41,13 +42,14 @@ import org.elasticsearch.tasks.TaskId; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.TransportService; +import java.io.IOException; +import java.util.Objects; import java.util.function.Consumer; public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction< TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> { public static final String NAME = CloseIndexAction.NAME + "[s]"; - public static final ClusterBlock EXPECTED_BLOCK = MetaDataIndexStateService.INDEX_CLOSED_BLOCK; @Inject public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService, @@ -83,25 +85,25 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA @Override protected PrimaryResult shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary) throws Exception { - executeShardOperation(primary); + executeShardOperation(shardRequest, primary); return new PrimaryResult<>(shardRequest, new ReplicationResponse()); } @Override protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception { - executeShardOperation(replica); + executeShardOperation(shardRequest, replica); return new ReplicaResult(); } - private void executeShardOperation(final IndexShard indexShard) { + private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) { final ShardId shardId = indexShard.shardId(); if (indexShard.getActiveOperationsCount() != 0) { throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing"); } final ClusterBlocks clusterBlocks = clusterService.state().blocks(); - if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), EXPECTED_BLOCK) == false) { - throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + EXPECTED_BLOCK + " before closing"); + if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) { + throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing"); } final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo(); @@ -139,17 +141,36 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA public static class ShardRequest extends ReplicationRequest { + private ClusterBlock clusterBlock; + ShardRequest(){ } - public ShardRequest(final ShardId shardId, final TaskId parentTaskId) { + public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) { super(shardId); + this.clusterBlock = Objects.requireNonNull(clusterBlock); setParentTask(parentTaskId); } @Override public String toString() { - return "verify shard before close {" + shardId + "}"; + return "verify shard " + shardId + " before close with block " + clusterBlock; + } + + @Override + public void readFrom(final StreamInput in) throws IOException { + super.readFrom(in); + clusterBlock = ClusterBlock.readClusterBlock(in); + } + + @Override + public void writeTo(final StreamOutput out) throws IOException { + super.writeTo(out); + clusterBlock.writeTo(out); + } + + public ClusterBlock clusterBlock() { + return clusterBlock; } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java index fafd3977220..5713462b921 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlock.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster.block; +import org.elasticsearch.Version; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; @@ -30,29 +32,31 @@ import java.io.IOException; import java.util.ArrayList; import java.util.EnumSet; import java.util.Locale; +import java.util.Objects; public class ClusterBlock implements Streamable, ToXContentFragment { private int id; - + private @Nullable String uuid; private String description; - private EnumSet levels; - private boolean retryable; - private boolean disableStatePersistence = false; - private boolean allowReleaseResources; - private RestStatus status; - ClusterBlock() { + private ClusterBlock() { } - public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, boolean allowReleaseResources, - RestStatus status, EnumSet levels) { + public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, + boolean allowReleaseResources, RestStatus status, EnumSet levels) { + this(id, null, description, retryable, disableStatePersistence, allowReleaseResources, status, levels); + } + + public ClusterBlock(int id, String uuid, String description, boolean retryable, boolean disableStatePersistence, + boolean allowReleaseResources, RestStatus status, EnumSet levels) { this.id = id; + this.uuid = uuid; this.description = description; this.retryable = retryable; this.disableStatePersistence = disableStatePersistence; @@ -65,6 +69,10 @@ public class ClusterBlock implements Streamable, ToXContentFragment { return this.id; } + public String uuid() { + return uuid; + } + public String description() { return this.description; } @@ -104,6 +112,9 @@ public class ClusterBlock implements Streamable, ToXContentFragment { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Integer.toString(id)); + if (uuid != null) { + builder.field("uuid", uuid); + } builder.field("description", description); builder.field("retryable", retryable); if (disableStatePersistence) { @@ -127,6 +138,11 @@ public class ClusterBlock implements Streamable, ToXContentFragment { @Override public void readFrom(StreamInput in) throws IOException { id = in.readVInt(); + if (in.getVersion().onOrAfter(Version.V_7_0_0)) { + uuid = in.readOptionalString(); + } else { + uuid = null; + } description = in.readString(); final int len = in.readVInt(); ArrayList levels = new ArrayList<>(len); @@ -143,6 +159,9 @@ public class ClusterBlock implements Streamable, ToXContentFragment { @Override public void writeTo(StreamOutput out) throws IOException { out.writeVInt(id); + if (out.getVersion().onOrAfter(Version.V_7_0_0)) { + out.writeOptionalString(uuid); + } out.writeString(description); out.writeVInt(levels.size()); for (ClusterBlockLevel level : levels) { @@ -157,7 +176,11 @@ public class ClusterBlock implements Streamable, ToXContentFragment { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(id).append(",").append(description).append(", blocks "); + sb.append(id).append(","); + if (uuid != null) { + sb.append(uuid).append(','); + } + sb.append(description).append(", blocks "); String delimiter = ""; for (ClusterBlockLevel level : levels) { sb.append(delimiter).append(level.name()); @@ -168,19 +191,19 @@ public class ClusterBlock implements Streamable, ToXContentFragment { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - ClusterBlock that = (ClusterBlock) o; - - if (id != that.id) return false; - - return true; + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final ClusterBlock that = (ClusterBlock) o; + return id == that.id && Objects.equals(uuid, that.uuid); } @Override public int hashCode() { - return id; + return Objects.hash(id, uuid); } public boolean isAllowReleaseResources() { diff --git a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java index 941fee6da7d..0de7bce1159 100644 --- a/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java +++ b/server/src/main/java/org/elasticsearch/cluster/block/ClusterBlocks.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.AbstractDiffable; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -147,6 +148,31 @@ public class ClusterBlocks extends AbstractDiffable { return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block); } + public boolean hasIndexBlockWithId(String index, int blockId) { + final Set clusterBlocks = indicesBlocks.get(index); + if (clusterBlocks != null) { + for (ClusterBlock clusterBlock : clusterBlocks) { + if (clusterBlock.id() == blockId) { + return true; + } + } + } + return false; + } + + @Nullable + public ClusterBlock getIndexBlockWithId(final String index, final int blockId) { + final Set clusterBlocks = indicesBlocks.get(index); + if (clusterBlocks != null) { + for (ClusterBlock clusterBlock : clusterBlocks) { + if (clusterBlock.id() == blockId) { + return clusterBlock; + } + } + } + return null; + } + public void globalBlockedRaiseException(ClusterBlockLevel level) throws ClusterBlockException { ClusterBlockException blockException = globalBlockedException(level); if (blockException != null) { @@ -403,6 +429,18 @@ public class ClusterBlocks extends AbstractDiffable { return this; } + public Builder removeIndexBlockWithId(String index, int blockId) { + final Set indexBlocks = indices.get(index); + if (indexBlocks == null) { + return this; + } + indexBlocks.removeIf(block -> block.id() == blockId); + if (indexBlocks.isEmpty()) { + indices.remove(index); + } + return this; + } + public ClusterBlocks build() { // We copy the block sets here in case of the builder is modified after build is called ImmutableOpenMap.Builder> indicesBuilder = ImmutableOpenMap.builder(indices.size()); diff --git a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java index 6ceda4bf57d..aa4434a0a74 100644 --- a/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java +++ b/server/src/main/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateService.java @@ -47,6 +47,8 @@ import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Priority; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.collect.ImmutableOpenIntMap; import org.elasticsearch.common.inject.Inject; @@ -67,6 +69,8 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -83,6 +87,7 @@ import static java.util.Collections.unmodifiableMap; public class MetaDataIndexStateService { private static final Logger logger = LogManager.getLogger(MetaDataIndexStateService.class); + public static final int INDEX_CLOSED_BLOCK_ID = 4; public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false, false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE); @@ -123,11 +128,11 @@ public class MetaDataIndexStateService { clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices), new ClusterStateUpdateTask(Priority.URGENT) { - private final Set blockedIndices = new HashSet<>(); + private final Map blockedIndices = new HashMap<>(); @Override public ClusterState execute(final ClusterState currentState) { - return addIndexClosedBlocks(concreteIndices, currentState, blockedIndices); + return addIndexClosedBlocks(concreteIndices, blockedIndices, currentState); } @Override @@ -139,11 +144,21 @@ public class MetaDataIndexStateService { assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed"; threadPool.executor(ThreadPool.Names.MANAGEMENT) .execute(new WaitForClosedBlocksApplied(blockedIndices, request, - ActionListener.wrap(closedBlocksResults -> + ActionListener.wrap(results -> clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) { + + boolean acknowledged = true; + @Override public ClusterState execute(final ClusterState currentState) throws Exception { - final ClusterState updatedState = closeRoutingTable(currentState, closedBlocksResults); + final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results); + for (Map.Entry result : results.entrySet()) { + IndexMetaData updatedMetaData = updatedState.metaData().index(result.getKey()); + if (updatedMetaData != null && updatedMetaData.getState() != IndexMetaData.State.CLOSE) { + acknowledged = false; + break; + } + } return allocationService.reroute(updatedState, "indices closed"); } @@ -155,8 +170,6 @@ public class MetaDataIndexStateService { @Override public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) { - boolean acknowledged = closedBlocksResults.values().stream() - .allMatch(AcknowledgedResponse::isAcknowledged); listener.onResponse(new AcknowledgedResponse(acknowledged)); } }), @@ -182,11 +195,12 @@ public class MetaDataIndexStateService { /** * Step 1 - Start closing indices by adding a write block * - * This step builds the list of indices to close (the ones explicitly requested that are not in CLOSE state) and adds the index block - * {@link #INDEX_CLOSED_BLOCK} to every index to close in the cluster state. After the cluster state is published, the shards should - * start to reject writing operations and we can proceed with step 2. + * This step builds the list of indices to close (the ones explicitly requested that are not in CLOSE state) and adds a unique cluster + * block (or reuses an existing one) to every index to close in the cluster state. After the cluster state is published, the shards + * should start to reject writing operations and we can proceed with step 2. */ - static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterState currentState, final Set blockedIndices) { + static ClusterState addIndexClosedBlocks(final Index[] indices, final Map blockedIndices, + final ClusterState currentState) { final MetaData.Builder metadata = MetaData.builder(currentState.metaData()); final Set indicesToClose = new HashSet<>(); @@ -196,6 +210,7 @@ public class MetaDataIndexStateService { indicesToClose.add(indexMetaData); } else { logger.debug("index {} is already closed, ignoring", index); + assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); } } @@ -217,19 +232,37 @@ public class MetaDataIndexStateService { for (IndexMetaData indexToClose : indicesToClose) { final Index index = indexToClose.getIndex(); - if (currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK) == false) { - blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); + + ClusterBlock indexBlock = null; + final Set clusterBlocks = currentState.blocks().indices().get(index.getName()); + if (clusterBlocks != null) { + for (ClusterBlock clusterBlock : clusterBlocks) { + if (clusterBlock.id() == INDEX_CLOSED_BLOCK_ID) { + // Reuse the existing index closed block + indexBlock = clusterBlock; + break; + } + } } if (useDirectClose) { logger.debug("closing index {} directly", index); - metadata.put(IndexMetaData.builder(indexToClose).state(IndexMetaData.State.CLOSE)); + metadata.put(IndexMetaData.builder(indexToClose).state(IndexMetaData.State.CLOSE)); // increment version? + blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); routingTable.remove(index.getName()); + indexBlock = INDEX_CLOSED_BLOCK; + } else { + if (indexBlock == null) { + // Create a new index closed block + indexBlock = createIndexClosingBlock(); + } + assert Strings.hasLength(indexBlock.uuid()) : "Closing block should have a UUID"; } - blockedIndices.add(index); + blocks.addIndexBlock(index.getName(), indexBlock); + blockedIndices.put(index, indexBlock); } logger.info(() -> new ParameterizedMessage("closing indices {}", - blockedIndices.stream().map(Object::toString).collect(Collectors.joining(",")))); + blockedIndices.keySet().stream().map(Object::toString).collect(Collectors.joining(",")))); return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build(); } @@ -242,15 +275,15 @@ public class MetaDataIndexStateService { */ class WaitForClosedBlocksApplied extends AbstractRunnable { - private final Set blockedIndices; + private final Map blockedIndices; private final CloseIndexClusterStateUpdateRequest request; private final ActionListener> listener; - private WaitForClosedBlocksApplied(final Set blockedIndices, + private WaitForClosedBlocksApplied(final Map blockedIndices, final CloseIndexClusterStateUpdateRequest request, final ActionListener> listener) { if (blockedIndices == null || blockedIndices.isEmpty()) { - throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices"); + throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null"); } this.blockedIndices = blockedIndices; this.request = request; @@ -267,18 +300,18 @@ public class MetaDataIndexStateService { final Map results = ConcurrentCollections.newConcurrentMap(); final CountDown countDown = new CountDown(blockedIndices.size()); final ClusterState state = clusterService.state(); - for (Index blockedIndex : blockedIndices) { - waitForShardsReadyForClosing(blockedIndex, state, response -> { - results.put(blockedIndex, response); + blockedIndices.forEach((index, block) -> { + waitForShardsReadyForClosing(index, block, state, response -> { + results.put(index, response); if (countDown.countDown()) { listener.onResponse(unmodifiableMap(results)); } }); - } + }); } - private void waitForShardsReadyForClosing(final Index index, final ClusterState state, - final Consumer onResponse) { + private void waitForShardsReadyForClosing(final Index index, final ClusterBlock closingBlock, + final ClusterState state, final Consumer onResponse) { final IndexMetaData indexMetaData = state.metaData().index(index); if (indexMetaData == null) { logger.debug("index {} has been blocked before closing and is now deleted, ignoring", index); @@ -287,6 +320,7 @@ public class MetaDataIndexStateService { } final IndexRoutingTable indexRoutingTable = state.routingTable().index(index); if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) { + assert state.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); logger.debug("index {} has been blocked before closing and is already closed, ignoring", index); onResponse.accept(new AcknowledgedResponse(true)); return; @@ -299,7 +333,7 @@ public class MetaDataIndexStateService { for (IntObjectCursor shard : shards) { final IndexShardRoutingTable shardRoutingTable = shard.value; final ShardId shardId = shardRoutingTable.shardId(); - sendVerifyShardBeforeCloseRequest(shardRoutingTable, new NotifyOnceListener() { + sendVerifyShardBeforeCloseRequest(shardRoutingTable, closingBlock, new NotifyOnceListener() { @Override public void innerOnResponse(final ReplicationResponse replicationResponse) { ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo(); @@ -324,6 +358,7 @@ public class MetaDataIndexStateService { } private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, + final ClusterBlock closingBlock, final ActionListener listener) { final ShardId shardId = shardRoutingTable.shardId(); if (shardRoutingTable.primaryShard().unassigned()) { @@ -335,11 +370,10 @@ public class MetaDataIndexStateService { } final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId()); final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, parentTaskId); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, parentTaskId); if (request.ackTimeout() != null) { shardRequest.timeout(request.ackTimeout()); } - // TODO propagate a task id from the parent CloseIndexRequest to the ShardCloseRequests transportVerifyShardBeforeCloseAction.execute(shardRequest, listener); } } @@ -347,7 +381,9 @@ public class MetaDataIndexStateService { /** * Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing. */ - static ClusterState closeRoutingTable(final ClusterState currentState, final Map results) { + static ClusterState closeRoutingTable(final ClusterState currentState, + final Map blockedIndices, + final Map results) { final MetaData.Builder metadata = MetaData.builder(currentState.metaData()); final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); @@ -355,21 +391,29 @@ public class MetaDataIndexStateService { final Set closedIndices = new HashSet<>(); for (Map.Entry result : results.entrySet()) { final Index index = result.getKey(); + final boolean acknowledged = result.getValue().isAcknowledged(); try { - final IndexMetaData indexMetaData = metadata.getSafe(index); - if (indexMetaData.getState() != IndexMetaData.State.CLOSE) { - if (result.getValue().isAcknowledged()) { - logger.debug("closing index {} succeed, removing index routing table", index); - metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE)); - routingTable.remove(index.getName()); - closedIndices.add(index.getName()); - } else { - logger.debug("closing index {} failed, removing index block because: {}", index, result.getValue()); - blocks.removeIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); - } - } else { - logger.debug("index {} has been closed since it was blocked before closing, ignoring", index); + if (acknowledged == false) { + logger.debug("verification of shards before closing {} failed", index); + continue; } + final IndexMetaData indexMetaData = metadata.getSafe(index); + if (indexMetaData.getState() == IndexMetaData.State.CLOSE) { + logger.debug("verification of shards before closing {} succeeded but index is already closed", index); + assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); + continue; + } + final ClusterBlock closingBlock = blockedIndices.get(index); + if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) { + logger.debug("verification of shards before closing {} succeeded but block has been removed in the meantime", index); + continue; + } + + logger.debug("closing index {} succeeded", index); + blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID).addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK); + metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE)); + routingTable.remove(index.getName()); + closedIndices.add(index.getName()); } catch (final IndexNotFoundException e) { logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index); } @@ -405,64 +449,73 @@ public class MetaDataIndexStateService { final String indicesAsString = Arrays.toString(request.indices()); clusterService.submitStateUpdateTask("open-indices " + indicesAsString, - new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { - @Override - protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { - return new ClusterStateUpdateResponse(acknowledged); + new AckedClusterStateUpdateTask(Priority.URGENT, request, listener) { + @Override + protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { + return new ClusterStateUpdateResponse(acknowledged); + } + + @Override + public ClusterState execute(final ClusterState currentState) { + final ClusterState updatedState = openIndices(request.indices(), currentState); + //no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask + return allocationService.reroute(updatedState, "indices opened [" + indicesAsString + "]"); + } + } + ); + } + + ClusterState openIndices(final Index[] indices, final ClusterState currentState) { + final List indicesToOpen = new ArrayList<>(); + for (Index index : indices) { + final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index); + if (indexMetaData.getState() != IndexMetaData.State.OPEN) { + indicesToOpen.add(indexMetaData); + } else if (currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID)) { + indicesToOpen.add(indexMetaData); + } + } + + validateShardLimit(currentState, indices); + if (indicesToOpen.isEmpty()) { + return currentState; + } + + logger.info(() -> new ParameterizedMessage("opening indices [{}]", + String.join(",", indicesToOpen.stream().map(i -> (CharSequence) i.getIndex().toString())::iterator))); + + final MetaData.Builder metadata = MetaData.builder(currentState.metaData()); + final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); + final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion().minimumIndexCompatibilityVersion(); + + for (IndexMetaData indexMetaData : indicesToOpen) { + final Index index = indexMetaData.getIndex(); + if (indexMetaData.getState() != IndexMetaData.State.OPEN) { + IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.OPEN).build(); + // The index might be closed because we couldn't import it due to old incompatible version + // We need to check that this index can be upgraded to the current version + updatedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(updatedIndexMetaData, minIndexCompatibilityVersion); + try { + indicesService.verifyIndexMetadata(updatedIndexMetaData, updatedIndexMetaData); + } catch (Exception e) { + throw new ElasticsearchException("Failed to verify index " + index, e); + } + metadata.put(updatedIndexMetaData, true); } - @Override - public ClusterState execute(ClusterState currentState) { - List indicesToOpen = new ArrayList<>(); - for (Index index : request.indices()) { - final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index); - if (indexMetaData.getState() != IndexMetaData.State.OPEN) { - indicesToOpen.add(indexMetaData); - } - } + // Always removes index closed blocks (note: this can fail on-going close index actions) + blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID); + } - validateShardLimit(currentState, request.indices()); + ClusterState updatedState = ClusterState.builder(currentState).metaData(metadata).blocks(blocks).build(); - if (indicesToOpen.isEmpty()) { - return currentState; - } - - logger.info("opening indices [{}]", indicesAsString); - - MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); - ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder() - .blocks(currentState.blocks()); - final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion() - .minimumIndexCompatibilityVersion(); - for (IndexMetaData closedMetaData : indicesToOpen) { - final String indexName = closedMetaData.getIndex().getName(); - IndexMetaData indexMetaData = IndexMetaData.builder(closedMetaData).state(IndexMetaData.State.OPEN).build(); - // The index might be closed because we couldn't import it due to old incompatible version - // We need to check that this index can be upgraded to the current version - indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData, minIndexCompatibilityVersion); - try { - indicesService.verifyIndexMetadata(indexMetaData, indexMetaData); - } catch (Exception e) { - throw new ElasticsearchException("Failed to verify index " + indexMetaData.getIndex(), e); - } - - mdBuilder.put(indexMetaData, true); - blocksBuilder.removeIndexBlock(indexName, INDEX_CLOSED_BLOCK); - } - - ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build(); - - RoutingTable.Builder rtBuilder = RoutingTable.builder(updatedState.routingTable()); - for (IndexMetaData index : indicesToOpen) { - rtBuilder.addAsFromCloseToOpen(updatedState.metaData().getIndexSafe(index.getIndex())); - } - - //no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask - return allocationService.reroute( - ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(), - "indices opened [" + indicesAsString + "]"); + final RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable()); + for (IndexMetaData previousIndexMetaData : indicesToOpen) { + if (previousIndexMetaData.getState() != IndexMetaData.State.OPEN) { + routingTable.addAsFromCloseToOpen(updatedState.metaData().getIndexSafe(previousIndexMetaData.getIndex())); } - }); + } + return ClusterState.builder(updatedState).routingTable(routingTable.build()).build(); } /** @@ -492,4 +545,14 @@ public class MetaDataIndexStateService { return indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas()); } + /** + * @return Generates a {@link ClusterBlock} that blocks read and write operations on soon-to-be-closed indices. The + * cluster block is generated with the id value equals to {@link #INDEX_CLOSED_BLOCK_ID} and a unique UUID. + */ + public static ClusterBlock createIndexClosingBlock() { + return new ClusterBlock(INDEX_CLOSED_BLOCK_ID, UUIDs.randomBase64UUID(), "index preparing to close. Reopen the index to allow " + + "writes again or retry closing the index to fully close the index.", false, false, false, RestStatus.FORBIDDEN, + EnumSet.of(ClusterBlockLevel.WRITE)); + } + } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index c0da96ed1ef..6fc744db2f3 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -29,9 +29,11 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction.C import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.action.shard.ShardStateAction; +import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; @@ -61,7 +63,6 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; -import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK; import static org.elasticsearch.test.ClusterServiceUtils.createClusterService; import static org.elasticsearch.test.ClusterServiceUtils.setState; import static org.hamcrest.Matchers.arrayWithSize; @@ -81,6 +82,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { private IndexShard indexShard; private TransportVerifyShardBeforeCloseAction action; private ClusterService clusterService; + private ClusterBlock clusterBlock; private CapturingTransport transport; @BeforeClass @@ -102,8 +104,10 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { when(indexShard.shardId()).thenReturn(shardId); clusterService = createClusterService(threadPool); + + clusterBlock = MetaDataIndexStateService.createIndexClosingBlock(); setState(clusterService, new ClusterState.Builder(clusterService.state()) - .blocks(ClusterBlocks.builder().addIndexBlock("index", INDEX_CLOSED_BLOCK).build()).build()); + .blocks(ClusterBlocks.builder().blocks(clusterService.state().blocks()).addIndexBlock("index", clusterBlock).build()).build()); transport = new CapturingTransport(); TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool, @@ -130,8 +134,9 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { } private void executeOnPrimaryOrReplica() throws Exception { + final TaskId taskId = new TaskId("_node_id", randomNonNegativeLong()); final TransportVerifyShardBeforeCloseAction.ShardRequest request = - new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), new TaskId("_node_id", randomNonNegativeLong())); + new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, taskId); if (randomBoolean()) { assertNotNull(action.shardOperationOnPrimary(request, indexShard)); } else { @@ -158,7 +163,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica); assertThat(exception.getMessage(), - equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + INDEX_CLOSED_BLOCK + " before closing")); + equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + clusterBlock + " before closing")); verify(indexShard, times(0)).flush(any(FlushRequest.class)); } @@ -205,8 +210,9 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase { assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0)); final PlainActionFuture listener = new PlainActionFuture<>(); + TaskId taskId = new TaskId(clusterService.localNode().getId(), 0L); TransportVerifyShardBeforeCloseAction.ShardRequest request = - new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, new TaskId(clusterService.localNode().getId(), 0L)); + new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, taskId); ReplicationOperation.Replicas proxy = action.newReplicasProxy(primaryTerm); ReplicationOperation operation = diff --git a/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java b/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java index a84d160cf0c..4cf7b6f9c6d 100644 --- a/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/block/ClusterBlockTests.java @@ -20,36 +20,35 @@ package org.elasticsearch.cluster.block; import org.elasticsearch.Version; +import org.elasticsearch.common.UUIDs; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.test.ESTestCase; +import java.util.Arrays; import java.util.Collections; -import java.util.EnumSet; +import java.util.List; +import static java.util.EnumSet.copyOf; +import static org.elasticsearch.test.VersionUtils.getPreviousVersion; import static org.elasticsearch.test.VersionUtils.randomVersion; +import static org.elasticsearch.test.VersionUtils.randomVersionBetween; import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.isOneOf; +import static org.hamcrest.Matchers.nullValue; public class ClusterBlockTests extends ESTestCase { + public void testSerialization() throws Exception { - int iterations = randomIntBetween(10, 100); + int iterations = randomIntBetween(5, 20); for (int i = 0; i < iterations; i++) { - // Get a random version Version version = randomVersion(random()); - - // Get a random list of ClusterBlockLevels - EnumSet levels = EnumSet.noneOf(ClusterBlockLevel.class); - int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length); - for (int j = 0; j < nbLevels; j++) { - levels.add(randomFrom(ClusterBlockLevel.values())); - } - - ClusterBlock clusterBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(), - randomBoolean(), false, randomFrom(RestStatus.values()), levels); + ClusterBlock clusterBlock = randomClusterBlock(version); BytesStreamOutput out = new BytesStreamOutput(); out.setVersion(version); @@ -59,37 +58,133 @@ public class ClusterBlockTests extends ESTestCase { in.setVersion(version); ClusterBlock result = ClusterBlock.readClusterBlock(in); - assertThat(result.id(), equalTo(clusterBlock.id())); - assertThat(result.status(), equalTo(clusterBlock.status())); - assertThat(result.description(), equalTo(clusterBlock.description())); - assertThat(result.retryable(), equalTo(clusterBlock.retryable())); - assertThat(result.disableStatePersistence(), equalTo(clusterBlock.disableStatePersistence())); - assertArrayEquals(result.levels().toArray(), clusterBlock.levels().toArray()); + assertClusterBlockEquals(clusterBlock, result); + } + } + + public void testBwcSerialization() throws Exception { + for (int runs = 0; runs < randomIntBetween(5, 20); runs++) { + // Generate a random cluster block in version < 7.0.0 + final Version version = randomVersionBetween(random(), Version.V_6_0_0, getPreviousVersion(Version.V_7_0_0)); + final ClusterBlock expected = randomClusterBlock(version); + assertNull(expected.uuid()); + + // Serialize to node in current version + final BytesStreamOutput out = new BytesStreamOutput(); + expected.writeTo(out); + + // Deserialize and check the cluster block + final ClusterBlock actual = ClusterBlock.readClusterBlock(out.bytes().streamInput()); + assertClusterBlockEquals(expected, actual); + } + + for (int runs = 0; runs < randomIntBetween(5, 20); runs++) { + // Generate a random cluster block in current version + final ClusterBlock expected = randomClusterBlock(Version.CURRENT); + + // Serialize to node in version < 7.0.0 + final BytesStreamOutput out = new BytesStreamOutput(); + out.setVersion(randomVersionBetween(random(), Version.V_6_0_0, getPreviousVersion(Version.V_7_0_0))); + expected.writeTo(out); + + // Deserialize and check the cluster block + final StreamInput in = out.bytes().streamInput(); + in.setVersion(out.getVersion()); + final ClusterBlock actual = ClusterBlock.readClusterBlock(in); + + assertThat(actual.id(), equalTo(expected.id())); + assertThat(actual.status(), equalTo(expected.status())); + assertThat(actual.description(), equalTo(expected.description())); + assertThat(actual.retryable(), equalTo(expected.retryable())); + assertThat(actual.disableStatePersistence(), equalTo(expected.disableStatePersistence())); + assertArrayEquals(actual.levels().toArray(), expected.levels().toArray()); } } public void testToStringDanglingComma() { - EnumSet levels = EnumSet.noneOf(ClusterBlockLevel.class); - int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length); - for (int j = 0; j < nbLevels; j++) { - levels.add(randomFrom(ClusterBlockLevel.values())); - } - ClusterBlock clusterBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(), - randomBoolean(), false, randomFrom(RestStatus.values()), levels); + final ClusterBlock clusterBlock = randomClusterBlock(); assertThat(clusterBlock.toString(), not(endsWith(","))); } public void testGlobalBlocksCheckedIfNoIndicesSpecified() { - EnumSet levels = EnumSet.noneOf(ClusterBlockLevel.class); - int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length); - for (int j = 0; j < nbLevels; j++) { - levels.add(randomFrom(ClusterBlockLevel.values())); - } - ClusterBlock globalBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(), - randomBoolean(), false, randomFrom(RestStatus.values()), levels); + ClusterBlock globalBlock = randomClusterBlock(); ClusterBlocks clusterBlocks = new ClusterBlocks(Collections.singleton(globalBlock), ImmutableOpenMap.of()); ClusterBlockException exception = clusterBlocks.indicesBlockedException(randomFrom(globalBlock.levels()), new String[0]); assertNotNull(exception); assertEquals(exception.blocks(), Collections.singleton(globalBlock)); } + + public void testRemoveIndexBlockWithId() { + final ClusterBlocks.Builder builder = ClusterBlocks.builder(); + builder.addIndexBlock("index-1", + new ClusterBlock(1, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL))); + builder.addIndexBlock("index-1", + new ClusterBlock(2, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL))); + builder.addIndexBlock("index-1", + new ClusterBlock(3, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL))); + builder.addIndexBlock("index-1", + new ClusterBlock(3, "other uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL))); + + builder.addIndexBlock("index-2", + new ClusterBlock(3, "uuid3", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL))); + + ClusterBlocks clusterBlocks = builder.build(); + assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(4)); + assertThat(clusterBlocks.indices().get("index-2").size(), equalTo(1)); + + builder.removeIndexBlockWithId("index-1", 3); + clusterBlocks = builder.build(); + + assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(2)); + assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 1), is(true)); + assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 2), is(true)); + assertThat(clusterBlocks.indices().get("index-2").size(), equalTo(1)); + assertThat(clusterBlocks.hasIndexBlockWithId("index-2", 3), is(true)); + + builder.removeIndexBlockWithId("index-2", 3); + clusterBlocks = builder.build(); + + assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(2)); + assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 1), is(true)); + assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 2), is(true)); + assertThat(clusterBlocks.indices().get("index-2"), nullValue()); + assertThat(clusterBlocks.hasIndexBlockWithId("index-2", 3), is(false)); + } + + public void testGetIndexBlockWithId() { + final int blockId = randomInt(); + final ClusterBlock[] clusterBlocks = new ClusterBlock[randomIntBetween(1, 5)]; + + final ClusterBlocks.Builder builder = ClusterBlocks.builder(); + for (int i = 0; i < clusterBlocks.length; i++) { + clusterBlocks[i] = new ClusterBlock(blockId, "uuid" + i, "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)); + builder.addIndexBlock("index", clusterBlocks[i]); + } + + assertThat(builder.build().indices().get("index").size(), equalTo(clusterBlocks.length)); + assertThat(builder.build().getIndexBlockWithId("index", blockId), isOneOf(clusterBlocks)); + assertThat(builder.build().getIndexBlockWithId("index", randomValueOtherThan(blockId, ESTestCase::randomInt)), nullValue()); + } + + private ClusterBlock randomClusterBlock() { + return randomClusterBlock(randomVersion(random())); + } + + private ClusterBlock randomClusterBlock(final Version version) { + final String uuid = (version.onOrAfter(Version.V_7_0_0) && randomBoolean()) ? UUIDs.randomBase64UUID() : null; + final List levels = Arrays.asList(ClusterBlockLevel.values()); + return new ClusterBlock(randomInt(), uuid, "cluster block #" + randomInt(), randomBoolean(), randomBoolean(), randomBoolean(), + randomFrom(RestStatus.values()), copyOf(randomSubsetOf(randomIntBetween(1, levels.size()), levels))); + } + + private void assertClusterBlockEquals(final ClusterBlock expected, final ClusterBlock actual) { + assertEquals(expected, actual); + assertThat(actual.id(), equalTo(expected.id())); + assertThat(actual.uuid(), equalTo(expected.uuid())); + assertThat(actual.status(), equalTo(expected.status())); + assertThat(actual.description(), equalTo(expected.description())); + assertThat(actual.retryable(), equalTo(expected.retryable())); + assertThat(actual.disableStatePersistence(), equalTo(expected.disableStatePersistence())); + assertArrayEquals(actual.levels().toArray(), expected.levels().toArray()); + } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java index 6faaf8e1338..c30925514bb 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceTests.java @@ -56,9 +56,14 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static java.util.Collections.unmodifiableMap; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED; +import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK; +import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID; import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting; import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; import static org.hamcrest.Matchers.containsString; @@ -73,32 +78,37 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { public void testCloseRoutingTable() { final Set nonBlockedIndices = new HashSet<>(); - final Map blockedIndices = new HashMap<>(); + final Map blockedIndices = new HashMap<>(); + final Map results = new HashMap<>(); ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTable")).build(); for (int i = 0; i < randomIntBetween(1, 25); i++) { - final String indexName = randomAlphaOfLengthBetween(5, 15); + final String indexName = "index-" + i; if (randomBoolean()) { state = addOpenedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state); nonBlockedIndices.add(state.metaData().index(indexName).getIndex()); } else { - state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state); - blockedIndices.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean())); + final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock(); + state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock); + blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock); + results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean())); } } - final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices); + final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); assertThat(updatedState.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size())); for (Index nonBlockedIndex : nonBlockedIndices) { assertIsOpened(nonBlockedIndex.getName(), updatedState); + assertThat(updatedState.blocks().hasIndexBlockWithId(nonBlockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(false)); } - for (Map.Entry blockedIndex : blockedIndices.entrySet()) { - if (blockedIndex.getValue().isAcknowledged()) { - assertIsClosed(blockedIndex.getKey().getName(), updatedState); + for (Index blockedIndex : blockedIndices.keySet()) { + if (results.get(blockedIndex).isAcknowledged()) { + assertIsClosed(blockedIndex.getName(), updatedState); } else { - assertIsOpened(blockedIndex.getKey().getName(), updatedState); + assertIsOpened(blockedIndex.getName(), updatedState); + assertThat(updatedState.blocks().hasIndexBlockWithId(blockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(true)); } } } @@ -106,39 +116,45 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { public void testAddIndexClosedBlocks() { final ClusterState initialState = ClusterState.builder(new ClusterName("testAddIndexClosedBlocks")).build(); { - final Set blockedIndices = new HashSet<>(); + final Map blockedIndices = new HashMap<>(); + Index[] indices = new Index[]{new Index("_name", "_uid")}; expectThrows(IndexNotFoundException.class, () -> - MetaDataIndexStateService.addIndexClosedBlocks(new Index[]{new Index("_name", "_uid")}, initialState, blockedIndices)); + MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, initialState)); assertTrue(blockedIndices.isEmpty()); } { - final Set blockedIndices = new HashSet<>(); + final Map blockedIndices = new HashMap<>(); Index[] indices = Index.EMPTY_ARRAY; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, initialState, blockedIndices); + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, initialState); assertSame(initialState, updatedState); assertTrue(blockedIndices.isEmpty()); } { - final Set blockedIndices = new HashSet<>(); + final Map blockedIndices = new HashMap<>(); ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState); Index[] indices = new Index[]{state.metaData().index("closed").getIndex()}; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices); + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state); assertSame(state, updatedState); assertTrue(blockedIndices.isEmpty()); + } { - final Set blockedIndices = new HashSet<>(); + final Map blockedIndices = new HashMap<>(); ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState); state = addOpenedIndex("opened", randomIntBetween(1, 3), randomIntBetween(0, 3), state); Index[] indices = new Index[]{state.metaData().index("opened").getIndex(), state.metaData().index("closed").getIndex()}; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices); + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state); assertNotSame(state, updatedState); - assertTrue(blockedIndices.contains(updatedState.metaData().index("opened").getIndex())); - assertFalse(blockedIndices.contains(updatedState.metaData().index("closed").getIndex())); - assertIsBlocked("opened", updatedState, true); + + Index opened = updatedState.metaData().index("opened").getIndex(); + assertTrue(blockedIndices.containsKey(opened)); + assertHasBlock("opened", updatedState, blockedIndices.get(opened)); + + Index closed = updatedState.metaData().index("closed").getIndex(); + assertFalse(blockedIndices.containsKey(closed)); } { IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> { @@ -150,7 +166,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state); } Index[] indices = new Index[]{state.metaData().index("restored").getIndex()}; - MetaDataIndexStateService.addIndexClosedBlocks(indices, state, new HashSet<>()); + MetaDataIndexStateService.addIndexClosedBlocks(indices, unmodifiableMap(emptyMap()), state); }); assertThat(exception.getMessage(), containsString("Cannot close indices that are being restored: [[restored]]")); } @@ -164,12 +180,12 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state); } Index[] indices = new Index[]{state.metaData().index("snapshotted").getIndex()}; - MetaDataIndexStateService.addIndexClosedBlocks(indices, state, new HashSet<>()); + MetaDataIndexStateService.addIndexClosedBlocks(indices, unmodifiableMap(emptyMap()), state); }); assertThat(exception.getMessage(), containsString("Cannot close indices that are being snapshotted: [[snapshotted]]")); } { - final Set blockedIndices = new HashSet<>(); + final Map blockedIndices = new HashMap<>(); ClusterState state = addOpenedIndex("index-1", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState); state = addOpenedIndex("index-2", randomIntBetween(1, 3), randomIntBetween(0, 3), state); state = addOpenedIndex("index-3", randomIntBetween(1, 3), randomIntBetween(0, 3), state); @@ -177,30 +193,51 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { if (mixedVersions) { state = ClusterState.builder(state) .nodes(DiscoveryNodes.builder(state.nodes()) - .add(new DiscoveryNode("old_node", buildNewFakeTransportAddress(), Collections.emptyMap(), + .add(new DiscoveryNode("old_node", buildNewFakeTransportAddress(), emptyMap(), new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())), Version.V_6_0_0))) .build(); } - Index[] indices = new Index[]{state.metaData().index("index-1").getIndex(), - state.metaData().index("index-2").getIndex(), state.metaData().index("index-3").getIndex()}; - ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices); + Index index1 = state.metaData().index("index-1").getIndex(); + Index index2 = state.metaData().index("index-2").getIndex(); + Index index3 = state.metaData().index("index-3").getIndex(); + Index[] indices = new Index[]{index1, index2, index3}; + + ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state); assertNotSame(state, updatedState); - assertTrue(blockedIndices.contains(updatedState.metaData().index("index-1").getIndex())); - assertTrue(blockedIndices.contains(updatedState.metaData().index("index-2").getIndex())); - assertTrue(blockedIndices.contains(updatedState.metaData().index("index-3").getIndex())); - if (mixedVersions) { - assertIsClosed("index-1", updatedState); - assertIsClosed("index-2", updatedState); - assertIsClosed("index-2", updatedState); - } else { - assertIsBlocked("index-1", updatedState, true); - assertIsBlocked("index-2", updatedState, true); - assertIsBlocked("index-3", updatedState, true); + + for (Index index : indices) { + assertTrue(blockedIndices.containsKey(index)); + if (mixedVersions) { + assertIsClosed(index.getName(), updatedState); + } else { + assertHasBlock(index.getName(), updatedState, blockedIndices.get(index)); + } } } } + public void testAddIndexClosedBlocksReusesBlocks() { + ClusterState state = ClusterState.builder(new ClusterName("testAddIndexClosedBlocksReuseBlocks")).build(); + state = addOpenedIndex("test", randomIntBetween(1, 3), randomIntBetween(0, 3), state); + + Index test = state.metaData().index("test").getIndex(); + Index[] indices = new Index[]{test}; + + final Map blockedIndices = new HashMap<>(); + state = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state); + + assertTrue(blockedIndices.containsKey(test)); + assertHasBlock(test.getName(), state, blockedIndices.get(test)); + + final Map blockedIndices2 = new HashMap<>(); + state = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices2, state); + + assertTrue(blockedIndices2.containsKey(test)); + assertHasBlock(test.getName(), state, blockedIndices2.get(test)); + assertEquals(blockedIndices.get(test), blockedIndices2.get(test)); + } + public void testValidateShardLimit() { int nodesInCluster = randomIntBetween(2,100); ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster); @@ -251,11 +288,12 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { } private static ClusterState addClosedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) { - return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.CLOSE, MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.CLOSE, INDEX_CLOSED_BLOCK); } - private static ClusterState addBlockedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) { - return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.OPEN, MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + private static ClusterState addBlockedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state, + final ClusterBlock closingBlock) { + return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.OPEN, closingBlock); } private static ClusterState addRestoredIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) { @@ -329,16 +367,21 @@ public class MetaDataIndexStateServiceTests extends ESTestCase { private static void assertIsOpened(final String indexName, final ClusterState clusterState) { assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.OPEN)); assertThat(clusterState.routingTable().index(indexName), notNullValue()); - assertIsBlocked(indexName, clusterState, false); } private static void assertIsClosed(final String indexName, final ClusterState clusterState) { assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.CLOSE)); assertThat(clusterState.routingTable().index(indexName), nullValue()); - assertIsBlocked(indexName, clusterState, true); + assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true)); + assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]", + clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); } - private static void assertIsBlocked(final String indexName, final ClusterState clusterState, final boolean blocked) { - assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(blocked)); + private static void assertHasBlock(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) { + assertThat(clusterState.blocks().hasIndexBlock(indexName, closingBlock), is(true)); + assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]", + clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java index a9ffd4c47e1..5ee6a7c60da 100644 --- a/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java +++ b/server/src/test/java/org/elasticsearch/cluster/metadata/MetaDataIndexStateServiceUtils.java @@ -20,10 +20,10 @@ package org.elasticsearch.cluster.metadata; import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.index.Index; import java.util.Map; -import java.util.Set; public class MetaDataIndexStateServiceUtils { @@ -31,16 +31,19 @@ public class MetaDataIndexStateServiceUtils { } /** - * Allows to call {@link MetaDataIndexStateService#addIndexClosedBlocks(Index[], ClusterState, Set)} which is a protected method. + * Allows to call {@link MetaDataIndexStateService#addIndexClosedBlocks(Index[], Map, ClusterState)} which is a protected method. */ - public static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterState state, final Set blockedIndices) { - return MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices); + public static ClusterState addIndexClosedBlocks(final Index[] indices, final Map blockedIndices, + final ClusterState state) { + return MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state); } /** - * Allows to call {@link MetaDataIndexStateService#closeRoutingTable(ClusterState, Map)} which is a protected method. + * Allows to call {@link MetaDataIndexStateService#closeRoutingTable(ClusterState, Map, Map)} which is a protected method. */ - public static ClusterState closeRoutingTable(final ClusterState state, final Map results) { - return MetaDataIndexStateService.closeRoutingTable(state, results); + public static ClusterState closeRoutingTable(final ClusterState state, + final Map blockedIndices, + final Map results) { + return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results); } } diff --git a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java index 1403543078b..387ba1c3d96 100644 --- a/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java +++ b/server/src/test/java/org/elasticsearch/indices/cluster/ClusterStateChanges.java @@ -52,6 +52,7 @@ import org.elasticsearch.cluster.EmptyClusterInfoService; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry; import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry; +import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.coordination.JoinTaskExecutor; import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor; import org.elasticsearch.cluster.metadata.AliasValidator; @@ -94,9 +95,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; -import java.util.Set; +import java.util.Map; import java.util.function.Function; import java.util.stream.Collectors; @@ -222,10 +224,10 @@ public class ClusterStateChanges { final Index[] concreteIndices = Arrays.stream(request.indices()) .map(index -> state.metaData().index(index).getIndex()).toArray(Index[]::new); - final Set blockedIndices = new HashSet<>(); - ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, state, blockedIndices); + final Map blockedIndices = new HashMap<>(); + ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, blockedIndices, state); - newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices.stream() + newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices, blockedIndices.keySet().stream() .collect(Collectors.toMap(Function.identity(), index -> new AcknowledgedResponse(true)))); return allocationService.reroute(newState, "indices closed"); } diff --git a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java index c91189972c7..a0304c96430 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/CloseIndexIT.java @@ -39,12 +39,12 @@ import java.util.Locale; import java.util.concurrent.CountDownLatch; import java.util.stream.IntStream; +import static java.util.Collections.emptySet; import static java.util.stream.Collectors.toList; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; @@ -135,6 +135,7 @@ public class CloseIndexIT extends ESIntegTestCase { final int nbDocs = randomIntBetween(10, 50); indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs) .mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList())); + ensureYellowAndNoInitializingShards(indexName); final CountDownLatch startClosing = new CountDownLatch(1); final Thread[] threads = new Thread[randomIntBetween(2, 5)]; @@ -146,7 +147,11 @@ public class CloseIndexIT extends ESIntegTestCase { } catch (InterruptedException e) { throw new AssertionError(e); } - assertAcked(client().admin().indices().prepareClose(indexName)); + try { + client().admin().indices().prepareClose(indexName).get(); + } catch (final Exception e) { + assertException(e, indexName); + } }); threads[i].start(); } @@ -238,18 +243,84 @@ public class CloseIndexIT extends ESIntegTestCase { } } - static void assertIndexIsClosed(final String indexName) { + public void testConcurrentClosesAndOpens() throws Exception { + final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT); + createIndex(indexName); + + final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client()); + waitForDocs(1, indexer); + + final CountDownLatch latch = new CountDownLatch(1); + final Runnable waitForLatch = () -> { + try { + latch.await(); + } catch (final InterruptedException e) { + throw new AssertionError(e); + } + }; + + final List threads = new ArrayList<>(); + for (int i = 0; i < randomIntBetween(1, 3); i++) { + threads.add(new Thread(() -> { + try { + waitForLatch.run(); + client().admin().indices().prepareClose(indexName).get(); + } catch (final Exception e) { + throw new AssertionError(e); + } + })); + } + for (int i = 0; i < randomIntBetween(1, 3); i++) { + threads.add(new Thread(() -> { + try { + waitForLatch.run(); + assertAcked(client().admin().indices().prepareOpen(indexName).get()); + } catch (final Exception e) { + throw new AssertionError(e); + } + })); + } + + for (Thread thread : threads) { + thread.start(); + } + latch.countDown(); + for (Thread thread : threads) { + thread.join(); + } + + indexer.setAssertNoFailuresOnStop(false); + indexer.stop(); + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.CLOSE)); - assertThat(clusterState.routingTable().index(indexName), nullValue()); - assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true)); + if (clusterState.metaData().indices().get(indexName).getState() == IndexMetaData.State.CLOSE) { + assertIndexIsClosed(indexName); + assertAcked(client().admin().indices().prepareOpen(indexName)); + } + refresh(indexName); + assertIndexIsOpened(indexName); + assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexer.totalIndexedDocs()); } - static void assertIndexIsOpened(final String indexName) { + static void assertIndexIsClosed(final String... indices) { final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); - assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN)); - assertThat(clusterState.routingTable().index(indexName), notNullValue()); - assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(false)); + for (String index : indices) { + assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.CLOSE)); + assertThat(clusterState.routingTable().index(index), nullValue()); + assertThat(clusterState.blocks().hasIndexBlock(index, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true)); + assertThat("Index " + index + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]", + clusterState.blocks().indices().getOrDefault(index, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); + } + } + + static void assertIndexIsOpened(final String... indices) { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + for (String index : indices) { + assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.OPEN)); + assertThat(clusterState.routingTable().index(index), notNullValue()); + assertThat(clusterState.blocks().hasIndexBlock(index, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(false)); + } } static void assertException(final Throwable throwable, final String indexName) { @@ -257,7 +328,7 @@ public class CloseIndexIT extends ESIntegTestCase { if (t instanceof ClusterBlockException) { ClusterBlockException clusterBlockException = (ClusterBlockException) t; assertThat(clusterBlockException.blocks(), hasSize(1)); - assertThat(clusterBlockException.blocks(), hasItem(MetaDataIndexStateService.INDEX_CLOSED_BLOCK)); + assertTrue(clusterBlockException.blocks().stream().allMatch(b -> b.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID)); } else if (t instanceof IndexClosedException) { IndexClosedException indexClosedException = (IndexClosedException) t; assertThat(indexClosedException.getIndex(), notNullValue()); diff --git a/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java b/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java index ddbbd0ea73a..e9e9108f5e8 100644 --- a/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java +++ b/server/src/test/java/org/elasticsearch/indices/state/OpenCloseIndexIT.java @@ -21,7 +21,6 @@ package org.elasticsearch.indices.state; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; -import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; @@ -46,6 +45,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_RE import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY; import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE; +import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed; +import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; @@ -53,7 +54,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.notNullValue; public class OpenCloseIndexIT extends ESIntegTestCase { public void testSimpleCloseOpen() { @@ -258,23 +258,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase { ensureGreen("test"); } - private void assertIndexIsOpened(String... indices) { - checkIndexState(IndexMetaData.State.OPEN, indices); - } - - private void assertIndexIsClosed(String... indices) { - checkIndexState(IndexMetaData.State.CLOSE, indices); - } - - private void checkIndexState(IndexMetaData.State expectedState, String... indices) { - ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().execute().actionGet(); - for (String index : indices) { - IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get(index); - assertThat(indexMetaData, notNullValue()); - assertThat(indexMetaData.getState(), equalTo(expectedState)); - } - } - public void testOpenCloseWithDocs() throws IOException, ExecutionException, InterruptedException { String mapping = Strings.toString(XContentFactory.jsonBuilder(). startObject(). diff --git a/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java new file mode 100644 index 00000000000..901c4f327af --- /dev/null +++ b/server/src/test/java/org/elasticsearch/indices/state/ReopenWhileClosingIT.java @@ -0,0 +1,167 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.state; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction; +import org.elasticsearch.action.support.master.AcknowledgedResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Glob; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.util.concurrent.RunOnce; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; +import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID; +import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed; +import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 2) +public class ReopenWhileClosingIT extends ESIntegTestCase { + + @Override + protected Collection> nodePlugins() { + return singletonList(MockTransportService.TestPlugin.class); + } + + public void testReopenDuringClose() throws Exception { + final String indexName = "test"; + createIndexWithDocs(indexName); + + ensureYellowAndNoInitializingShards(indexName); + + final CountDownLatch block = new CountDownLatch(1); + final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(indexName, block::countDown); + + ActionFuture closeIndexResponse = client().admin().indices().prepareClose(indexName).execute(); + assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS)); + assertIndexIsBlocked(indexName); + assertFalse(closeIndexResponse.isDone()); + + assertAcked(client().admin().indices().prepareOpen(indexName)); + + releaseBlock.close(); + assertFalse(closeIndexResponse.get().isAcknowledged()); + assertIndexIsOpened(indexName); + } + + public void testReopenDuringCloseOnMultipleIndices() throws Exception { + final List indices = new ArrayList<>(); + for (int i = 0; i < randomIntBetween(2, 10); i++) { + indices.add("index-" + i); + createIndexWithDocs(indices.get(i)); + } + + ensureYellowAndNoInitializingShards(indices.toArray(Strings.EMPTY_ARRAY)); + + final CountDownLatch block = new CountDownLatch(1); + final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(randomFrom(indices), block::countDown); + + ActionFuture closeIndexResponse = client().admin().indices().prepareClose("index-*").execute(); + assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS)); + assertFalse(closeIndexResponse.isDone()); + indices.forEach(ReopenWhileClosingIT::assertIndexIsBlocked); + + final List reopenedIndices = randomSubsetOf(randomIntBetween(1, indices.size()), indices); + assertAcked(client().admin().indices().prepareOpen(reopenedIndices.toArray(Strings.EMPTY_ARRAY))); + + releaseBlock.close(); + assertFalse(closeIndexResponse.get().isAcknowledged()); + + indices.forEach(index -> { + if (reopenedIndices.contains(index)) { + assertIndexIsOpened(index); + } else { + assertIndexIsClosed(index); + } + }); + } + + private void createIndexWithDocs(final String indexName) { + createIndex(indexName); + final int nbDocs = scaledRandomIntBetween(1, 100); + for (int i = 0; i < nbDocs; i++) { + index(indexName, "_doc", String.valueOf(i), "num", i); + } + assertIndexIsOpened(indexName); + } + + /** + * Intercepts and blocks the {@link TransportVerifyShardBeforeCloseAction} executed for the given index pattern. + */ + private Releasable interceptVerifyShardBeforeCloseActions(final String indexPattern, final Runnable onIntercept) { + final MockTransportService mockTransportService = (MockTransportService) internalCluster() + .getInstance(TransportService.class, internalCluster().getMasterName()); + + final CountDownLatch release = new CountDownLatch(1); + for (DiscoveryNode node : internalCluster().clusterService().state().getNodes()) { + mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, node.getName()), + (connection, requestId, action, request, options) -> { + if (action.startsWith(TransportVerifyShardBeforeCloseAction.NAME)) { + if (request instanceof TransportVerifyShardBeforeCloseAction.ShardRequest) { + final String index = ((TransportVerifyShardBeforeCloseAction.ShardRequest) request).shardId().getIndexName(); + if (Glob.globMatch(indexPattern, index)) { + logger.info("request {} intercepted for index {}", requestId, index); + onIntercept.run(); + try { + release.await(); + logger.info("request {} released for index {}", requestId, index); + } catch (final InterruptedException e) { + throw new AssertionError(e); + } + } + } + + } + connection.sendRequest(requestId, action, request, options); + }); + } + final RunOnce releaseOnce = new RunOnce(release::countDown); + return releaseOnce::run; + } + + private static void assertIndexIsBlocked(final String... indices) { + final ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + for (String index : indices) { + assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.OPEN)); + assertThat(clusterState.routingTable().index(index), notNullValue()); + assertThat("Index " + index + " must have only 1 block with [id=" + INDEX_CLOSED_BLOCK_ID + "]", + clusterState.blocks().indices().getOrDefault(index, emptySet()).stream() + .filter(clusterBlock -> clusterBlock.id() == INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L)); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 4c5ed74e3ca..afdb14eaf6b 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -544,7 +544,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest logger.info("--> start snapshot with default settings and closed index - should be blocked"); assertBlocked(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") .setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed") - .setWaitForCompletion(true), MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + .setWaitForCompletion(true), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID); logger.info("--> start snapshot with default settings without a closed index - should fail"); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 7c1cff09130..3461aac3ec2 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -1562,7 +1562,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> snapshot with closed index"); assertBlocked(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true) - .setIndices("test-idx", "test-idx-closed"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + .setIndices("test-idx", "test-idx-closed"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID); } public void testSnapshotSingleClosedIndex() throws Exception { @@ -1580,7 +1580,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> snapshot"); assertBlocked(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1") - .setWaitForCompletion(true).setIndices("test-idx"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK); + .setWaitForCompletion(true).setIndices("test-idx"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID); } public void testRenameOnRestore() throws Exception { diff --git a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java index 6005e7e6163..48d18e096bf 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java +++ b/test/framework/src/main/java/org/elasticsearch/test/hamcrest/ElasticsearchAssertions.java @@ -129,7 +129,7 @@ public class ElasticsearchAssertions { * @param builder the request builder */ public static void assertBlocked(ActionRequestBuilder builder) { - assertBlocked(builder, null); + assertBlocked(builder, (ClusterBlock) null); } /** @@ -155,9 +155,9 @@ public class ElasticsearchAssertions { * Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}. * * @param builder the request builder - * @param expectedBlock the expected block + * @param expectedBlockId the expected block id */ - public static void assertBlocked(ActionRequestBuilder builder, ClusterBlock expectedBlock) { + public static void assertBlocked(final ActionRequestBuilder builder, @Nullable final Integer expectedBlockId) { try { builder.get(); fail("Request executed with success but a ClusterBlockException was expected"); @@ -165,19 +165,29 @@ public class ElasticsearchAssertions { assertThat(e.blocks().size(), greaterThan(0)); assertThat(e.status(), equalTo(RestStatus.FORBIDDEN)); - if (expectedBlock != null) { + if (expectedBlockId != null) { boolean found = false; for (ClusterBlock clusterBlock : e.blocks()) { - if (clusterBlock.id() == expectedBlock.id()) { + if (clusterBlock.id() == expectedBlockId) { found = true; break; } } - assertThat("Request should have been blocked by [" + expectedBlock + "] instead of " + e.blocks(), found, equalTo(true)); + assertThat("Request should have been blocked by [" + expectedBlockId + "] instead of " + e.blocks(), found, equalTo(true)); } } } + /** + * Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}. + * + * @param builder the request builder + * @param expectedBlock the expected block + */ + public static void assertBlocked(final ActionRequestBuilder builder, @Nullable final ClusterBlock expectedBlock) { + assertBlocked(builder, expectedBlock != null ? expectedBlock.id() : null); + } + public static String formatShardStatus(BroadcastResponse response) { StringBuilder msg = new StringBuilder(); msg.append(" Total shards: ").append(response.getTotalShards())