[Close Index API] Add unique UUID to ClusterBlock (#36775)
This commit adds a unique id to cluster blocks, so that they can be uniquely identified if needed. This is important for the Close Index API where multiple concurrent closing requests can be executed at the same time. By adding a UUID to the cluster block, we can generate unique "closing block" that can later be verified on shards and then checked again from the cluster state before closing the index. When the verification on shard is done, the closing block is replaced by the regular INDEX_CLOSED_BLOCK instance. If something goes wrong, calling the Open Index API will remove the block. Related to #33888
This commit is contained in:
parent
f5af79b9cd
commit
e149b0852e
|
@ -20,7 +20,6 @@
|
|||
package org.elasticsearch.test.rest;
|
||||
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.apache.lucene.util.LuceneTestCase;
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.support.PlainActionFuture;
|
||||
import org.elasticsearch.client.Request;
|
||||
|
@ -41,7 +40,6 @@ import static org.hamcrest.Matchers.instanceOf;
|
|||
/**
|
||||
* Tests that wait for refresh is fired if the index is closed.
|
||||
*/
|
||||
@LuceneTestCase.AwaitsFix(bugUrl = "to be created")
|
||||
public class WaitForRefreshAndCloseIT extends ESRestTestCase {
|
||||
@Before
|
||||
public void setupIndex() throws IOException {
|
||||
|
|
|
@ -29,9 +29,10 @@ import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
|||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
|
@ -41,13 +42,14 @@ import org.elasticsearch.tasks.TaskId;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
|
||||
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {
|
||||
|
||||
public static final String NAME = CloseIndexAction.NAME + "[s]";
|
||||
public static final ClusterBlock EXPECTED_BLOCK = MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
|
||||
|
||||
@Inject
|
||||
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
|
||||
|
@ -83,25 +85,25 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
|
|||
@Override
|
||||
protected PrimaryResult<ShardRequest, ReplicationResponse> shardOperationOnPrimary(final ShardRequest shardRequest,
|
||||
final IndexShard primary) throws Exception {
|
||||
executeShardOperation(primary);
|
||||
executeShardOperation(shardRequest, primary);
|
||||
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception {
|
||||
executeShardOperation(replica);
|
||||
executeShardOperation(shardRequest, replica);
|
||||
return new ReplicaResult();
|
||||
}
|
||||
|
||||
private void executeShardOperation(final IndexShard indexShard) {
|
||||
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
|
||||
final ShardId shardId = indexShard.shardId();
|
||||
if (indexShard.getActiveOperationsCount() != 0) {
|
||||
throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing");
|
||||
}
|
||||
|
||||
final ClusterBlocks clusterBlocks = clusterService.state().blocks();
|
||||
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), EXPECTED_BLOCK) == false) {
|
||||
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + EXPECTED_BLOCK + " before closing");
|
||||
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
|
||||
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing");
|
||||
}
|
||||
|
||||
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
|
||||
|
@ -139,17 +141,36 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
|
|||
|
||||
public static class ShardRequest extends ReplicationRequest<ShardRequest> {
|
||||
|
||||
private ClusterBlock clusterBlock;
|
||||
|
||||
ShardRequest(){
|
||||
}
|
||||
|
||||
public ShardRequest(final ShardId shardId, final TaskId parentTaskId) {
|
||||
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
|
||||
super(shardId);
|
||||
this.clusterBlock = Objects.requireNonNull(clusterBlock);
|
||||
setParentTask(parentTaskId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "verify shard before close {" + shardId + "}";
|
||||
return "verify shard " + shardId + " before close with block " + clusterBlock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFrom(final StreamInput in) throws IOException {
|
||||
super.readFrom(in);
|
||||
clusterBlock = ClusterBlock.readClusterBlock(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
clusterBlock.writeTo(out);
|
||||
}
|
||||
|
||||
public ClusterBlock clusterBlock() {
|
||||
return clusterBlock;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
|
||||
package org.elasticsearch.cluster.block;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Streamable;
|
||||
|
@ -30,29 +32,31 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.Locale;
|
||||
import java.util.Objects;
|
||||
|
||||
public class ClusterBlock implements Streamable, ToXContentFragment {
|
||||
|
||||
private int id;
|
||||
|
||||
private @Nullable String uuid;
|
||||
private String description;
|
||||
|
||||
private EnumSet<ClusterBlockLevel> levels;
|
||||
|
||||
private boolean retryable;
|
||||
|
||||
private boolean disableStatePersistence = false;
|
||||
|
||||
private boolean allowReleaseResources;
|
||||
|
||||
private RestStatus status;
|
||||
|
||||
ClusterBlock() {
|
||||
private ClusterBlock() {
|
||||
}
|
||||
|
||||
public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, boolean allowReleaseResources,
|
||||
RestStatus status, EnumSet<ClusterBlockLevel> levels) {
|
||||
public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence,
|
||||
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
|
||||
this(id, null, description, retryable, disableStatePersistence, allowReleaseResources, status, levels);
|
||||
}
|
||||
|
||||
public ClusterBlock(int id, String uuid, String description, boolean retryable, boolean disableStatePersistence,
|
||||
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
|
||||
this.id = id;
|
||||
this.uuid = uuid;
|
||||
this.description = description;
|
||||
this.retryable = retryable;
|
||||
this.disableStatePersistence = disableStatePersistence;
|
||||
|
@ -65,6 +69,10 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
|
|||
return this.id;
|
||||
}
|
||||
|
||||
public String uuid() {
|
||||
return uuid;
|
||||
}
|
||||
|
||||
public String description() {
|
||||
return this.description;
|
||||
}
|
||||
|
@ -104,6 +112,9 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
|
|||
@Override
|
||||
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
|
||||
builder.startObject(Integer.toString(id));
|
||||
if (uuid != null) {
|
||||
builder.field("uuid", uuid);
|
||||
}
|
||||
builder.field("description", description);
|
||||
builder.field("retryable", retryable);
|
||||
if (disableStatePersistence) {
|
||||
|
@ -127,6 +138,11 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
|
|||
@Override
|
||||
public void readFrom(StreamInput in) throws IOException {
|
||||
id = in.readVInt();
|
||||
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
uuid = in.readOptionalString();
|
||||
} else {
|
||||
uuid = null;
|
||||
}
|
||||
description = in.readString();
|
||||
final int len = in.readVInt();
|
||||
ArrayList<ClusterBlockLevel> levels = new ArrayList<>(len);
|
||||
|
@ -143,6 +159,9 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
|
|||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(id);
|
||||
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
|
||||
out.writeOptionalString(uuid);
|
||||
}
|
||||
out.writeString(description);
|
||||
out.writeVInt(levels.size());
|
||||
for (ClusterBlockLevel level : levels) {
|
||||
|
@ -157,7 +176,11 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
|
|||
@Override
|
||||
public String toString() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append(id).append(",").append(description).append(", blocks ");
|
||||
sb.append(id).append(",");
|
||||
if (uuid != null) {
|
||||
sb.append(uuid).append(',');
|
||||
}
|
||||
sb.append(description).append(", blocks ");
|
||||
String delimiter = "";
|
||||
for (ClusterBlockLevel level : levels) {
|
||||
sb.append(delimiter).append(level.name());
|
||||
|
@ -168,19 +191,19 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
|
|||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
if (o == null || getClass() != o.getClass()) return false;
|
||||
|
||||
ClusterBlock that = (ClusterBlock) o;
|
||||
|
||||
if (id != that.id) return false;
|
||||
|
||||
return true;
|
||||
if (this == o) {
|
||||
return true;
|
||||
}
|
||||
if (o == null || getClass() != o.getClass()) {
|
||||
return false;
|
||||
}
|
||||
final ClusterBlock that = (ClusterBlock) o;
|
||||
return id == that.id && Objects.equals(uuid, that.uuid);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return id;
|
||||
return Objects.hash(id, uuid);
|
||||
}
|
||||
|
||||
public boolean isAllowReleaseResources() {
|
||||
|
|
|
@ -24,6 +24,7 @@ import org.elasticsearch.cluster.AbstractDiffable;
|
|||
import org.elasticsearch.cluster.Diff;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
|
@ -147,6 +148,31 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
|
|||
return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block);
|
||||
}
|
||||
|
||||
public boolean hasIndexBlockWithId(String index, int blockId) {
|
||||
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
|
||||
if (clusterBlocks != null) {
|
||||
for (ClusterBlock clusterBlock : clusterBlocks) {
|
||||
if (clusterBlock.id() == blockId) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public ClusterBlock getIndexBlockWithId(final String index, final int blockId) {
|
||||
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
|
||||
if (clusterBlocks != null) {
|
||||
for (ClusterBlock clusterBlock : clusterBlocks) {
|
||||
if (clusterBlock.id() == blockId) {
|
||||
return clusterBlock;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public void globalBlockedRaiseException(ClusterBlockLevel level) throws ClusterBlockException {
|
||||
ClusterBlockException blockException = globalBlockedException(level);
|
||||
if (blockException != null) {
|
||||
|
@ -403,6 +429,18 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
|
|||
return this;
|
||||
}
|
||||
|
||||
public Builder removeIndexBlockWithId(String index, int blockId) {
|
||||
final Set<ClusterBlock> indexBlocks = indices.get(index);
|
||||
if (indexBlocks == null) {
|
||||
return this;
|
||||
}
|
||||
indexBlocks.removeIf(block -> block.id() == blockId);
|
||||
if (indexBlocks.isEmpty()) {
|
||||
indices.remove(index);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
public ClusterBlocks build() {
|
||||
// We copy the block sets here in case of the builder is modified after build is called
|
||||
ImmutableOpenMap.Builder<String, Set<ClusterBlock>> indicesBuilder = ImmutableOpenMap.builder(indices.size());
|
||||
|
|
|
@ -47,6 +47,8 @@ import org.elasticsearch.cluster.routing.RoutingTable;
|
|||
import org.elasticsearch.cluster.routing.allocation.AllocationService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.Priority;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.ValidationException;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
|
@ -67,6 +69,8 @@ import org.elasticsearch.threadpool.ThreadPool;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -83,6 +87,7 @@ import static java.util.Collections.unmodifiableMap;
|
|||
public class MetaDataIndexStateService {
|
||||
private static final Logger logger = LogManager.getLogger(MetaDataIndexStateService.class);
|
||||
|
||||
public static final int INDEX_CLOSED_BLOCK_ID = 4;
|
||||
public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false,
|
||||
false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
|
||||
|
||||
|
@ -123,11 +128,11 @@ public class MetaDataIndexStateService {
|
|||
clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
|
||||
new ClusterStateUpdateTask(Priority.URGENT) {
|
||||
|
||||
private final Set<Index> blockedIndices = new HashSet<>();
|
||||
private final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
|
||||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) {
|
||||
return addIndexClosedBlocks(concreteIndices, currentState, blockedIndices);
|
||||
return addIndexClosedBlocks(concreteIndices, blockedIndices, currentState);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -139,11 +144,21 @@ public class MetaDataIndexStateService {
|
|||
assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed";
|
||||
threadPool.executor(ThreadPool.Names.MANAGEMENT)
|
||||
.execute(new WaitForClosedBlocksApplied(blockedIndices, request,
|
||||
ActionListener.wrap(closedBlocksResults ->
|
||||
ActionListener.wrap(results ->
|
||||
clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) {
|
||||
|
||||
boolean acknowledged = true;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) throws Exception {
|
||||
final ClusterState updatedState = closeRoutingTable(currentState, closedBlocksResults);
|
||||
final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results);
|
||||
for (Map.Entry<Index, AcknowledgedResponse> result : results.entrySet()) {
|
||||
IndexMetaData updatedMetaData = updatedState.metaData().index(result.getKey());
|
||||
if (updatedMetaData != null && updatedMetaData.getState() != IndexMetaData.State.CLOSE) {
|
||||
acknowledged = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
return allocationService.reroute(updatedState, "indices closed");
|
||||
}
|
||||
|
||||
|
@ -155,8 +170,6 @@ public class MetaDataIndexStateService {
|
|||
@Override
|
||||
public void clusterStateProcessed(final String source,
|
||||
final ClusterState oldState, final ClusterState newState) {
|
||||
boolean acknowledged = closedBlocksResults.values().stream()
|
||||
.allMatch(AcknowledgedResponse::isAcknowledged);
|
||||
listener.onResponse(new AcknowledgedResponse(acknowledged));
|
||||
}
|
||||
}),
|
||||
|
@ -182,11 +195,12 @@ public class MetaDataIndexStateService {
|
|||
/**
|
||||
* Step 1 - Start closing indices by adding a write block
|
||||
*
|
||||
* This step builds the list of indices to close (the ones explicitly requested that are not in CLOSE state) and adds the index block
|
||||
* {@link #INDEX_CLOSED_BLOCK} to every index to close in the cluster state. After the cluster state is published, the shards should
|
||||
* start to reject writing operations and we can proceed with step 2.
|
||||
* This step builds the list of indices to close (the ones explicitly requested that are not in CLOSE state) and adds a unique cluster
|
||||
* block (or reuses an existing one) to every index to close in the cluster state. After the cluster state is published, the shards
|
||||
* should start to reject writing operations and we can proceed with step 2.
|
||||
*/
|
||||
static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterState currentState, final Set<Index> blockedIndices) {
|
||||
static ClusterState addIndexClosedBlocks(final Index[] indices, final Map<Index, ClusterBlock> blockedIndices,
|
||||
final ClusterState currentState) {
|
||||
final MetaData.Builder metadata = MetaData.builder(currentState.metaData());
|
||||
|
||||
final Set<IndexMetaData> indicesToClose = new HashSet<>();
|
||||
|
@ -196,6 +210,7 @@ public class MetaDataIndexStateService {
|
|||
indicesToClose.add(indexMetaData);
|
||||
} else {
|
||||
logger.debug("index {} is already closed, ignoring", index);
|
||||
assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -217,19 +232,37 @@ public class MetaDataIndexStateService {
|
|||
|
||||
for (IndexMetaData indexToClose : indicesToClose) {
|
||||
final Index index = indexToClose.getIndex();
|
||||
if (currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK) == false) {
|
||||
blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
|
||||
|
||||
ClusterBlock indexBlock = null;
|
||||
final Set<ClusterBlock> clusterBlocks = currentState.blocks().indices().get(index.getName());
|
||||
if (clusterBlocks != null) {
|
||||
for (ClusterBlock clusterBlock : clusterBlocks) {
|
||||
if (clusterBlock.id() == INDEX_CLOSED_BLOCK_ID) {
|
||||
// Reuse the existing index closed block
|
||||
indexBlock = clusterBlock;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (useDirectClose) {
|
||||
logger.debug("closing index {} directly", index);
|
||||
metadata.put(IndexMetaData.builder(indexToClose).state(IndexMetaData.State.CLOSE));
|
||||
metadata.put(IndexMetaData.builder(indexToClose).state(IndexMetaData.State.CLOSE)); // increment version?
|
||||
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
|
||||
routingTable.remove(index.getName());
|
||||
indexBlock = INDEX_CLOSED_BLOCK;
|
||||
} else {
|
||||
if (indexBlock == null) {
|
||||
// Create a new index closed block
|
||||
indexBlock = createIndexClosingBlock();
|
||||
}
|
||||
assert Strings.hasLength(indexBlock.uuid()) : "Closing block should have a UUID";
|
||||
}
|
||||
blockedIndices.add(index);
|
||||
blocks.addIndexBlock(index.getName(), indexBlock);
|
||||
blockedIndices.put(index, indexBlock);
|
||||
}
|
||||
|
||||
logger.info(() -> new ParameterizedMessage("closing indices {}",
|
||||
blockedIndices.stream().map(Object::toString).collect(Collectors.joining(","))));
|
||||
blockedIndices.keySet().stream().map(Object::toString).collect(Collectors.joining(","))));
|
||||
return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
|
||||
}
|
||||
|
||||
|
@ -242,15 +275,15 @@ public class MetaDataIndexStateService {
|
|||
*/
|
||||
class WaitForClosedBlocksApplied extends AbstractRunnable {
|
||||
|
||||
private final Set<Index> blockedIndices;
|
||||
private final Map<Index, ClusterBlock> blockedIndices;
|
||||
private final CloseIndexClusterStateUpdateRequest request;
|
||||
private final ActionListener<Map<Index, AcknowledgedResponse>> listener;
|
||||
|
||||
private WaitForClosedBlocksApplied(final Set<Index> blockedIndices,
|
||||
private WaitForClosedBlocksApplied(final Map<Index, ClusterBlock> blockedIndices,
|
||||
final CloseIndexClusterStateUpdateRequest request,
|
||||
final ActionListener<Map<Index, AcknowledgedResponse>> listener) {
|
||||
if (blockedIndices == null || blockedIndices.isEmpty()) {
|
||||
throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices");
|
||||
throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null");
|
||||
}
|
||||
this.blockedIndices = blockedIndices;
|
||||
this.request = request;
|
||||
|
@ -267,18 +300,18 @@ public class MetaDataIndexStateService {
|
|||
final Map<Index, AcknowledgedResponse> results = ConcurrentCollections.newConcurrentMap();
|
||||
final CountDown countDown = new CountDown(blockedIndices.size());
|
||||
final ClusterState state = clusterService.state();
|
||||
for (Index blockedIndex : blockedIndices) {
|
||||
waitForShardsReadyForClosing(blockedIndex, state, response -> {
|
||||
results.put(blockedIndex, response);
|
||||
blockedIndices.forEach((index, block) -> {
|
||||
waitForShardsReadyForClosing(index, block, state, response -> {
|
||||
results.put(index, response);
|
||||
if (countDown.countDown()) {
|
||||
listener.onResponse(unmodifiableMap(results));
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void waitForShardsReadyForClosing(final Index index, final ClusterState state,
|
||||
final Consumer<AcknowledgedResponse> onResponse) {
|
||||
private void waitForShardsReadyForClosing(final Index index, final ClusterBlock closingBlock,
|
||||
final ClusterState state, final Consumer<AcknowledgedResponse> onResponse) {
|
||||
final IndexMetaData indexMetaData = state.metaData().index(index);
|
||||
if (indexMetaData == null) {
|
||||
logger.debug("index {} has been blocked before closing and is now deleted, ignoring", index);
|
||||
|
@ -287,6 +320,7 @@ public class MetaDataIndexStateService {
|
|||
}
|
||||
final IndexRoutingTable indexRoutingTable = state.routingTable().index(index);
|
||||
if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) {
|
||||
assert state.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
|
||||
logger.debug("index {} has been blocked before closing and is already closed, ignoring", index);
|
||||
onResponse.accept(new AcknowledgedResponse(true));
|
||||
return;
|
||||
|
@ -299,7 +333,7 @@ public class MetaDataIndexStateService {
|
|||
for (IntObjectCursor<IndexShardRoutingTable> shard : shards) {
|
||||
final IndexShardRoutingTable shardRoutingTable = shard.value;
|
||||
final ShardId shardId = shardRoutingTable.shardId();
|
||||
sendVerifyShardBeforeCloseRequest(shardRoutingTable, new NotifyOnceListener<ReplicationResponse>() {
|
||||
sendVerifyShardBeforeCloseRequest(shardRoutingTable, closingBlock, new NotifyOnceListener<ReplicationResponse>() {
|
||||
@Override
|
||||
public void innerOnResponse(final ReplicationResponse replicationResponse) {
|
||||
ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo();
|
||||
|
@ -324,6 +358,7 @@ public class MetaDataIndexStateService {
|
|||
}
|
||||
|
||||
private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable,
|
||||
final ClusterBlock closingBlock,
|
||||
final ActionListener<ReplicationResponse> listener) {
|
||||
final ShardId shardId = shardRoutingTable.shardId();
|
||||
if (shardRoutingTable.primaryShard().unassigned()) {
|
||||
|
@ -335,11 +370,10 @@ public class MetaDataIndexStateService {
|
|||
}
|
||||
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId());
|
||||
final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest =
|
||||
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, parentTaskId);
|
||||
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, parentTaskId);
|
||||
if (request.ackTimeout() != null) {
|
||||
shardRequest.timeout(request.ackTimeout());
|
||||
}
|
||||
// TODO propagate a task id from the parent CloseIndexRequest to the ShardCloseRequests
|
||||
transportVerifyShardBeforeCloseAction.execute(shardRequest, listener);
|
||||
}
|
||||
}
|
||||
|
@ -347,7 +381,9 @@ public class MetaDataIndexStateService {
|
|||
/**
|
||||
* Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing.
|
||||
*/
|
||||
static ClusterState closeRoutingTable(final ClusterState currentState, final Map<Index, AcknowledgedResponse> results) {
|
||||
static ClusterState closeRoutingTable(final ClusterState currentState,
|
||||
final Map<Index, ClusterBlock> blockedIndices,
|
||||
final Map<Index, AcknowledgedResponse> results) {
|
||||
final MetaData.Builder metadata = MetaData.builder(currentState.metaData());
|
||||
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
|
||||
|
@ -355,21 +391,29 @@ public class MetaDataIndexStateService {
|
|||
final Set<String> closedIndices = new HashSet<>();
|
||||
for (Map.Entry<Index, AcknowledgedResponse> result : results.entrySet()) {
|
||||
final Index index = result.getKey();
|
||||
final boolean acknowledged = result.getValue().isAcknowledged();
|
||||
try {
|
||||
final IndexMetaData indexMetaData = metadata.getSafe(index);
|
||||
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
|
||||
if (result.getValue().isAcknowledged()) {
|
||||
logger.debug("closing index {} succeed, removing index routing table", index);
|
||||
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
|
||||
routingTable.remove(index.getName());
|
||||
closedIndices.add(index.getName());
|
||||
} else {
|
||||
logger.debug("closing index {} failed, removing index block because: {}", index, result.getValue());
|
||||
blocks.removeIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
|
||||
}
|
||||
} else {
|
||||
logger.debug("index {} has been closed since it was blocked before closing, ignoring", index);
|
||||
if (acknowledged == false) {
|
||||
logger.debug("verification of shards before closing {} failed", index);
|
||||
continue;
|
||||
}
|
||||
final IndexMetaData indexMetaData = metadata.getSafe(index);
|
||||
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
|
||||
logger.debug("verification of shards before closing {} succeeded but index is already closed", index);
|
||||
assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
|
||||
continue;
|
||||
}
|
||||
final ClusterBlock closingBlock = blockedIndices.get(index);
|
||||
if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) {
|
||||
logger.debug("verification of shards before closing {} succeeded but block has been removed in the meantime", index);
|
||||
continue;
|
||||
}
|
||||
|
||||
logger.debug("closing index {} succeeded", index);
|
||||
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID).addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
|
||||
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
|
||||
routingTable.remove(index.getName());
|
||||
closedIndices.add(index.getName());
|
||||
} catch (final IndexNotFoundException e) {
|
||||
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
|
||||
}
|
||||
|
@ -405,64 +449,73 @@ public class MetaDataIndexStateService {
|
|||
|
||||
final String indicesAsString = Arrays.toString(request.indices());
|
||||
clusterService.submitStateUpdateTask("open-indices " + indicesAsString,
|
||||
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
|
||||
@Override
|
||||
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
|
||||
return new ClusterStateUpdateResponse(acknowledged);
|
||||
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
|
||||
@Override
|
||||
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
|
||||
return new ClusterStateUpdateResponse(acknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) {
|
||||
final ClusterState updatedState = openIndices(request.indices(), currentState);
|
||||
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
|
||||
return allocationService.reroute(updatedState, "indices opened [" + indicesAsString + "]");
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
ClusterState openIndices(final Index[] indices, final ClusterState currentState) {
|
||||
final List<IndexMetaData> indicesToOpen = new ArrayList<>();
|
||||
for (Index index : indices) {
|
||||
final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index);
|
||||
if (indexMetaData.getState() != IndexMetaData.State.OPEN) {
|
||||
indicesToOpen.add(indexMetaData);
|
||||
} else if (currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID)) {
|
||||
indicesToOpen.add(indexMetaData);
|
||||
}
|
||||
}
|
||||
|
||||
validateShardLimit(currentState, indices);
|
||||
if (indicesToOpen.isEmpty()) {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
logger.info(() -> new ParameterizedMessage("opening indices [{}]",
|
||||
String.join(",", indicesToOpen.stream().map(i -> (CharSequence) i.getIndex().toString())::iterator)));
|
||||
|
||||
final MetaData.Builder metadata = MetaData.builder(currentState.metaData());
|
||||
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion().minimumIndexCompatibilityVersion();
|
||||
|
||||
for (IndexMetaData indexMetaData : indicesToOpen) {
|
||||
final Index index = indexMetaData.getIndex();
|
||||
if (indexMetaData.getState() != IndexMetaData.State.OPEN) {
|
||||
IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.OPEN).build();
|
||||
// The index might be closed because we couldn't import it due to old incompatible version
|
||||
// We need to check that this index can be upgraded to the current version
|
||||
updatedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(updatedIndexMetaData, minIndexCompatibilityVersion);
|
||||
try {
|
||||
indicesService.verifyIndexMetadata(updatedIndexMetaData, updatedIndexMetaData);
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchException("Failed to verify index " + index, e);
|
||||
}
|
||||
metadata.put(updatedIndexMetaData, true);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
List<IndexMetaData> indicesToOpen = new ArrayList<>();
|
||||
for (Index index : request.indices()) {
|
||||
final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index);
|
||||
if (indexMetaData.getState() != IndexMetaData.State.OPEN) {
|
||||
indicesToOpen.add(indexMetaData);
|
||||
}
|
||||
}
|
||||
// Always removes index closed blocks (note: this can fail on-going close index actions)
|
||||
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
|
||||
}
|
||||
|
||||
validateShardLimit(currentState, request.indices());
|
||||
ClusterState updatedState = ClusterState.builder(currentState).metaData(metadata).blocks(blocks).build();
|
||||
|
||||
if (indicesToOpen.isEmpty()) {
|
||||
return currentState;
|
||||
}
|
||||
|
||||
logger.info("opening indices [{}]", indicesAsString);
|
||||
|
||||
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
|
||||
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
|
||||
.blocks(currentState.blocks());
|
||||
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
|
||||
.minimumIndexCompatibilityVersion();
|
||||
for (IndexMetaData closedMetaData : indicesToOpen) {
|
||||
final String indexName = closedMetaData.getIndex().getName();
|
||||
IndexMetaData indexMetaData = IndexMetaData.builder(closedMetaData).state(IndexMetaData.State.OPEN).build();
|
||||
// The index might be closed because we couldn't import it due to old incompatible version
|
||||
// We need to check that this index can be upgraded to the current version
|
||||
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData, minIndexCompatibilityVersion);
|
||||
try {
|
||||
indicesService.verifyIndexMetadata(indexMetaData, indexMetaData);
|
||||
} catch (Exception e) {
|
||||
throw new ElasticsearchException("Failed to verify index " + indexMetaData.getIndex(), e);
|
||||
}
|
||||
|
||||
mdBuilder.put(indexMetaData, true);
|
||||
blocksBuilder.removeIndexBlock(indexName, INDEX_CLOSED_BLOCK);
|
||||
}
|
||||
|
||||
ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build();
|
||||
|
||||
RoutingTable.Builder rtBuilder = RoutingTable.builder(updatedState.routingTable());
|
||||
for (IndexMetaData index : indicesToOpen) {
|
||||
rtBuilder.addAsFromCloseToOpen(updatedState.metaData().getIndexSafe(index.getIndex()));
|
||||
}
|
||||
|
||||
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
|
||||
return allocationService.reroute(
|
||||
ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(),
|
||||
"indices opened [" + indicesAsString + "]");
|
||||
final RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
|
||||
for (IndexMetaData previousIndexMetaData : indicesToOpen) {
|
||||
if (previousIndexMetaData.getState() != IndexMetaData.State.OPEN) {
|
||||
routingTable.addAsFromCloseToOpen(updatedState.metaData().getIndexSafe(previousIndexMetaData.getIndex()));
|
||||
}
|
||||
});
|
||||
}
|
||||
return ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -492,4 +545,14 @@ public class MetaDataIndexStateService {
|
|||
return indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Generates a {@link ClusterBlock} that blocks read and write operations on soon-to-be-closed indices. The
|
||||
* cluster block is generated with the id value equals to {@link #INDEX_CLOSED_BLOCK_ID} and a unique UUID.
|
||||
*/
|
||||
public static ClusterBlock createIndexClosingBlock() {
|
||||
return new ClusterBlock(INDEX_CLOSED_BLOCK_ID, UUIDs.randomBase64UUID(), "index preparing to close. Reopen the index to allow " +
|
||||
"writes again or retry closing the index to fully close the index.", false, false, false, RestStatus.FORBIDDEN,
|
||||
EnumSet.of(ClusterBlockLevel.WRITE));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -29,9 +29,11 @@ import org.elasticsearch.action.support.replication.TransportReplicationAction.C
|
|||
import org.elasticsearch.cluster.ClusterName;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.routing.ShardRoutingState;
|
||||
|
@ -61,7 +63,6 @@ import java.util.Set;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
|
||||
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||
import static org.hamcrest.Matchers.arrayWithSize;
|
||||
|
@ -81,6 +82,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
|
|||
private IndexShard indexShard;
|
||||
private TransportVerifyShardBeforeCloseAction action;
|
||||
private ClusterService clusterService;
|
||||
private ClusterBlock clusterBlock;
|
||||
private CapturingTransport transport;
|
||||
|
||||
@BeforeClass
|
||||
|
@ -102,8 +104,10 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
|
|||
when(indexShard.shardId()).thenReturn(shardId);
|
||||
|
||||
clusterService = createClusterService(threadPool);
|
||||
|
||||
clusterBlock = MetaDataIndexStateService.createIndexClosingBlock();
|
||||
setState(clusterService, new ClusterState.Builder(clusterService.state())
|
||||
.blocks(ClusterBlocks.builder().addIndexBlock("index", INDEX_CLOSED_BLOCK).build()).build());
|
||||
.blocks(ClusterBlocks.builder().blocks(clusterService.state().blocks()).addIndexBlock("index", clusterBlock).build()).build());
|
||||
|
||||
transport = new CapturingTransport();
|
||||
TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool,
|
||||
|
@ -130,8 +134,9 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private void executeOnPrimaryOrReplica() throws Exception {
|
||||
final TaskId taskId = new TaskId("_node_id", randomNonNegativeLong());
|
||||
final TransportVerifyShardBeforeCloseAction.ShardRequest request =
|
||||
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), new TaskId("_node_id", randomNonNegativeLong()));
|
||||
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, taskId);
|
||||
if (randomBoolean()) {
|
||||
assertNotNull(action.shardOperationOnPrimary(request, indexShard));
|
||||
} else {
|
||||
|
@ -158,7 +163,7 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
|
|||
|
||||
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
|
||||
assertThat(exception.getMessage(),
|
||||
equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + INDEX_CLOSED_BLOCK + " before closing"));
|
||||
equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + clusterBlock + " before closing"));
|
||||
verify(indexShard, times(0)).flush(any(FlushRequest.class));
|
||||
}
|
||||
|
||||
|
@ -205,8 +210,9 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
|
|||
assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0));
|
||||
|
||||
final PlainActionFuture<PrimaryResult> listener = new PlainActionFuture<>();
|
||||
TaskId taskId = new TaskId(clusterService.localNode().getId(), 0L);
|
||||
TransportVerifyShardBeforeCloseAction.ShardRequest request =
|
||||
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, new TaskId(clusterService.localNode().getId(), 0L));
|
||||
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, taskId);
|
||||
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy(primaryTerm);
|
||||
ReplicationOperation<TransportVerifyShardBeforeCloseAction.ShardRequest,
|
||||
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation =
|
||||
|
|
|
@ -20,36 +20,35 @@
|
|||
package org.elasticsearch.cluster.block;
|
||||
|
||||
import org.elasticsearch.Version;
|
||||
import org.elasticsearch.common.UUIDs;
|
||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||
import org.elasticsearch.common.io.stream.BytesStreamOutput;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.rest.RestStatus;
|
||||
import org.elasticsearch.test.ESTestCase;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
||||
import static java.util.EnumSet.copyOf;
|
||||
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.randomVersion;
|
||||
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
|
||||
import static org.hamcrest.CoreMatchers.endsWith;
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.not;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.isOneOf;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
|
||||
public class ClusterBlockTests extends ESTestCase {
|
||||
|
||||
public void testSerialization() throws Exception {
|
||||
int iterations = randomIntBetween(10, 100);
|
||||
int iterations = randomIntBetween(5, 20);
|
||||
for (int i = 0; i < iterations; i++) {
|
||||
// Get a random version
|
||||
Version version = randomVersion(random());
|
||||
|
||||
// Get a random list of ClusterBlockLevels
|
||||
EnumSet<ClusterBlockLevel> levels = EnumSet.noneOf(ClusterBlockLevel.class);
|
||||
int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length);
|
||||
for (int j = 0; j < nbLevels; j++) {
|
||||
levels.add(randomFrom(ClusterBlockLevel.values()));
|
||||
}
|
||||
|
||||
ClusterBlock clusterBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(),
|
||||
randomBoolean(), false, randomFrom(RestStatus.values()), levels);
|
||||
ClusterBlock clusterBlock = randomClusterBlock(version);
|
||||
|
||||
BytesStreamOutput out = new BytesStreamOutput();
|
||||
out.setVersion(version);
|
||||
|
@ -59,37 +58,133 @@ public class ClusterBlockTests extends ESTestCase {
|
|||
in.setVersion(version);
|
||||
ClusterBlock result = ClusterBlock.readClusterBlock(in);
|
||||
|
||||
assertThat(result.id(), equalTo(clusterBlock.id()));
|
||||
assertThat(result.status(), equalTo(clusterBlock.status()));
|
||||
assertThat(result.description(), equalTo(clusterBlock.description()));
|
||||
assertThat(result.retryable(), equalTo(clusterBlock.retryable()));
|
||||
assertThat(result.disableStatePersistence(), equalTo(clusterBlock.disableStatePersistence()));
|
||||
assertArrayEquals(result.levels().toArray(), clusterBlock.levels().toArray());
|
||||
assertClusterBlockEquals(clusterBlock, result);
|
||||
}
|
||||
}
|
||||
|
||||
public void testBwcSerialization() throws Exception {
|
||||
for (int runs = 0; runs < randomIntBetween(5, 20); runs++) {
|
||||
// Generate a random cluster block in version < 7.0.0
|
||||
final Version version = randomVersionBetween(random(), Version.V_6_0_0, getPreviousVersion(Version.V_7_0_0));
|
||||
final ClusterBlock expected = randomClusterBlock(version);
|
||||
assertNull(expected.uuid());
|
||||
|
||||
// Serialize to node in current version
|
||||
final BytesStreamOutput out = new BytesStreamOutput();
|
||||
expected.writeTo(out);
|
||||
|
||||
// Deserialize and check the cluster block
|
||||
final ClusterBlock actual = ClusterBlock.readClusterBlock(out.bytes().streamInput());
|
||||
assertClusterBlockEquals(expected, actual);
|
||||
}
|
||||
|
||||
for (int runs = 0; runs < randomIntBetween(5, 20); runs++) {
|
||||
// Generate a random cluster block in current version
|
||||
final ClusterBlock expected = randomClusterBlock(Version.CURRENT);
|
||||
|
||||
// Serialize to node in version < 7.0.0
|
||||
final BytesStreamOutput out = new BytesStreamOutput();
|
||||
out.setVersion(randomVersionBetween(random(), Version.V_6_0_0, getPreviousVersion(Version.V_7_0_0)));
|
||||
expected.writeTo(out);
|
||||
|
||||
// Deserialize and check the cluster block
|
||||
final StreamInput in = out.bytes().streamInput();
|
||||
in.setVersion(out.getVersion());
|
||||
final ClusterBlock actual = ClusterBlock.readClusterBlock(in);
|
||||
|
||||
assertThat(actual.id(), equalTo(expected.id()));
|
||||
assertThat(actual.status(), equalTo(expected.status()));
|
||||
assertThat(actual.description(), equalTo(expected.description()));
|
||||
assertThat(actual.retryable(), equalTo(expected.retryable()));
|
||||
assertThat(actual.disableStatePersistence(), equalTo(expected.disableStatePersistence()));
|
||||
assertArrayEquals(actual.levels().toArray(), expected.levels().toArray());
|
||||
}
|
||||
}
|
||||
|
||||
public void testToStringDanglingComma() {
|
||||
EnumSet<ClusterBlockLevel> levels = EnumSet.noneOf(ClusterBlockLevel.class);
|
||||
int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length);
|
||||
for (int j = 0; j < nbLevels; j++) {
|
||||
levels.add(randomFrom(ClusterBlockLevel.values()));
|
||||
}
|
||||
ClusterBlock clusterBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(),
|
||||
randomBoolean(), false, randomFrom(RestStatus.values()), levels);
|
||||
final ClusterBlock clusterBlock = randomClusterBlock();
|
||||
assertThat(clusterBlock.toString(), not(endsWith(",")));
|
||||
}
|
||||
|
||||
public void testGlobalBlocksCheckedIfNoIndicesSpecified() {
|
||||
EnumSet<ClusterBlockLevel> levels = EnumSet.noneOf(ClusterBlockLevel.class);
|
||||
int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length);
|
||||
for (int j = 0; j < nbLevels; j++) {
|
||||
levels.add(randomFrom(ClusterBlockLevel.values()));
|
||||
}
|
||||
ClusterBlock globalBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(),
|
||||
randomBoolean(), false, randomFrom(RestStatus.values()), levels);
|
||||
ClusterBlock globalBlock = randomClusterBlock();
|
||||
ClusterBlocks clusterBlocks = new ClusterBlocks(Collections.singleton(globalBlock), ImmutableOpenMap.of());
|
||||
ClusterBlockException exception = clusterBlocks.indicesBlockedException(randomFrom(globalBlock.levels()), new String[0]);
|
||||
assertNotNull(exception);
|
||||
assertEquals(exception.blocks(), Collections.singleton(globalBlock));
|
||||
}
|
||||
|
||||
public void testRemoveIndexBlockWithId() {
|
||||
final ClusterBlocks.Builder builder = ClusterBlocks.builder();
|
||||
builder.addIndexBlock("index-1",
|
||||
new ClusterBlock(1, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
|
||||
builder.addIndexBlock("index-1",
|
||||
new ClusterBlock(2, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
|
||||
builder.addIndexBlock("index-1",
|
||||
new ClusterBlock(3, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
|
||||
builder.addIndexBlock("index-1",
|
||||
new ClusterBlock(3, "other uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
|
||||
|
||||
builder.addIndexBlock("index-2",
|
||||
new ClusterBlock(3, "uuid3", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
|
||||
|
||||
ClusterBlocks clusterBlocks = builder.build();
|
||||
assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(4));
|
||||
assertThat(clusterBlocks.indices().get("index-2").size(), equalTo(1));
|
||||
|
||||
builder.removeIndexBlockWithId("index-1", 3);
|
||||
clusterBlocks = builder.build();
|
||||
|
||||
assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(2));
|
||||
assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 1), is(true));
|
||||
assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 2), is(true));
|
||||
assertThat(clusterBlocks.indices().get("index-2").size(), equalTo(1));
|
||||
assertThat(clusterBlocks.hasIndexBlockWithId("index-2", 3), is(true));
|
||||
|
||||
builder.removeIndexBlockWithId("index-2", 3);
|
||||
clusterBlocks = builder.build();
|
||||
|
||||
assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(2));
|
||||
assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 1), is(true));
|
||||
assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 2), is(true));
|
||||
assertThat(clusterBlocks.indices().get("index-2"), nullValue());
|
||||
assertThat(clusterBlocks.hasIndexBlockWithId("index-2", 3), is(false));
|
||||
}
|
||||
|
||||
public void testGetIndexBlockWithId() {
|
||||
final int blockId = randomInt();
|
||||
final ClusterBlock[] clusterBlocks = new ClusterBlock[randomIntBetween(1, 5)];
|
||||
|
||||
final ClusterBlocks.Builder builder = ClusterBlocks.builder();
|
||||
for (int i = 0; i < clusterBlocks.length; i++) {
|
||||
clusterBlocks[i] = new ClusterBlock(blockId, "uuid" + i, "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL));
|
||||
builder.addIndexBlock("index", clusterBlocks[i]);
|
||||
}
|
||||
|
||||
assertThat(builder.build().indices().get("index").size(), equalTo(clusterBlocks.length));
|
||||
assertThat(builder.build().getIndexBlockWithId("index", blockId), isOneOf(clusterBlocks));
|
||||
assertThat(builder.build().getIndexBlockWithId("index", randomValueOtherThan(blockId, ESTestCase::randomInt)), nullValue());
|
||||
}
|
||||
|
||||
private ClusterBlock randomClusterBlock() {
|
||||
return randomClusterBlock(randomVersion(random()));
|
||||
}
|
||||
|
||||
private ClusterBlock randomClusterBlock(final Version version) {
|
||||
final String uuid = (version.onOrAfter(Version.V_7_0_0) && randomBoolean()) ? UUIDs.randomBase64UUID() : null;
|
||||
final List<ClusterBlockLevel> levels = Arrays.asList(ClusterBlockLevel.values());
|
||||
return new ClusterBlock(randomInt(), uuid, "cluster block #" + randomInt(), randomBoolean(), randomBoolean(), randomBoolean(),
|
||||
randomFrom(RestStatus.values()), copyOf(randomSubsetOf(randomIntBetween(1, levels.size()), levels)));
|
||||
}
|
||||
|
||||
private void assertClusterBlockEquals(final ClusterBlock expected, final ClusterBlock actual) {
|
||||
assertEquals(expected, actual);
|
||||
assertThat(actual.id(), equalTo(expected.id()));
|
||||
assertThat(actual.uuid(), equalTo(expected.uuid()));
|
||||
assertThat(actual.status(), equalTo(expected.status()));
|
||||
assertThat(actual.description(), equalTo(expected.description()));
|
||||
assertThat(actual.retryable(), equalTo(expected.retryable()));
|
||||
assertThat(actual.disableStatePersistence(), equalTo(expected.disableStatePersistence()));
|
||||
assertArrayEquals(actual.levels().toArray(), expected.levels().toArray());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,9 +56,14 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static java.util.Collections.emptyMap;
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
|
||||
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
|
||||
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID;
|
||||
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
|
||||
import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
|
@ -73,32 +78,37 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
|
|||
|
||||
public void testCloseRoutingTable() {
|
||||
final Set<Index> nonBlockedIndices = new HashSet<>();
|
||||
final Map<Index, AcknowledgedResponse> blockedIndices = new HashMap<>();
|
||||
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
|
||||
final Map<Index, AcknowledgedResponse> results = new HashMap<>();
|
||||
|
||||
ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTable")).build();
|
||||
for (int i = 0; i < randomIntBetween(1, 25); i++) {
|
||||
final String indexName = randomAlphaOfLengthBetween(5, 15);
|
||||
final String indexName = "index-" + i;
|
||||
|
||||
if (randomBoolean()) {
|
||||
state = addOpenedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
|
||||
nonBlockedIndices.add(state.metaData().index(indexName).getIndex());
|
||||
} else {
|
||||
state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
|
||||
blockedIndices.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean()));
|
||||
final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock();
|
||||
state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock);
|
||||
blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock);
|
||||
results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean()));
|
||||
}
|
||||
}
|
||||
|
||||
final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices);
|
||||
final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results);
|
||||
assertThat(updatedState.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size()));
|
||||
|
||||
for (Index nonBlockedIndex : nonBlockedIndices) {
|
||||
assertIsOpened(nonBlockedIndex.getName(), updatedState);
|
||||
assertThat(updatedState.blocks().hasIndexBlockWithId(nonBlockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(false));
|
||||
}
|
||||
for (Map.Entry<Index, AcknowledgedResponse> blockedIndex : blockedIndices.entrySet()) {
|
||||
if (blockedIndex.getValue().isAcknowledged()) {
|
||||
assertIsClosed(blockedIndex.getKey().getName(), updatedState);
|
||||
for (Index blockedIndex : blockedIndices.keySet()) {
|
||||
if (results.get(blockedIndex).isAcknowledged()) {
|
||||
assertIsClosed(blockedIndex.getName(), updatedState);
|
||||
} else {
|
||||
assertIsOpened(blockedIndex.getKey().getName(), updatedState);
|
||||
assertIsOpened(blockedIndex.getName(), updatedState);
|
||||
assertThat(updatedState.blocks().hasIndexBlockWithId(blockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(true));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -106,39 +116,45 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
|
|||
public void testAddIndexClosedBlocks() {
|
||||
final ClusterState initialState = ClusterState.builder(new ClusterName("testAddIndexClosedBlocks")).build();
|
||||
{
|
||||
final Set<Index> blockedIndices = new HashSet<>();
|
||||
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
|
||||
Index[] indices = new Index[]{new Index("_name", "_uid")};
|
||||
expectThrows(IndexNotFoundException.class, () ->
|
||||
MetaDataIndexStateService.addIndexClosedBlocks(new Index[]{new Index("_name", "_uid")}, initialState, blockedIndices));
|
||||
MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, initialState));
|
||||
assertTrue(blockedIndices.isEmpty());
|
||||
}
|
||||
{
|
||||
final Set<Index> blockedIndices = new HashSet<>();
|
||||
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
|
||||
Index[] indices = Index.EMPTY_ARRAY;
|
||||
|
||||
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, initialState, blockedIndices);
|
||||
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, initialState);
|
||||
assertSame(initialState, updatedState);
|
||||
assertTrue(blockedIndices.isEmpty());
|
||||
}
|
||||
{
|
||||
final Set<Index> blockedIndices = new HashSet<>();
|
||||
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
|
||||
ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
|
||||
Index[] indices = new Index[]{state.metaData().index("closed").getIndex()};
|
||||
|
||||
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices);
|
||||
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state);
|
||||
assertSame(state, updatedState);
|
||||
assertTrue(blockedIndices.isEmpty());
|
||||
|
||||
}
|
||||
{
|
||||
final Set<Index> blockedIndices = new HashSet<>();
|
||||
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
|
||||
ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
|
||||
state = addOpenedIndex("opened", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
|
||||
Index[] indices = new Index[]{state.metaData().index("opened").getIndex(), state.metaData().index("closed").getIndex()};
|
||||
|
||||
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices);
|
||||
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state);
|
||||
assertNotSame(state, updatedState);
|
||||
assertTrue(blockedIndices.contains(updatedState.metaData().index("opened").getIndex()));
|
||||
assertFalse(blockedIndices.contains(updatedState.metaData().index("closed").getIndex()));
|
||||
assertIsBlocked("opened", updatedState, true);
|
||||
|
||||
Index opened = updatedState.metaData().index("opened").getIndex();
|
||||
assertTrue(blockedIndices.containsKey(opened));
|
||||
assertHasBlock("opened", updatedState, blockedIndices.get(opened));
|
||||
|
||||
Index closed = updatedState.metaData().index("closed").getIndex();
|
||||
assertFalse(blockedIndices.containsKey(closed));
|
||||
}
|
||||
{
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
|
||||
|
@ -150,7 +166,7 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
|
|||
state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
|
||||
}
|
||||
Index[] indices = new Index[]{state.metaData().index("restored").getIndex()};
|
||||
MetaDataIndexStateService.addIndexClosedBlocks(indices, state, new HashSet<>());
|
||||
MetaDataIndexStateService.addIndexClosedBlocks(indices, unmodifiableMap(emptyMap()), state);
|
||||
});
|
||||
assertThat(exception.getMessage(), containsString("Cannot close indices that are being restored: [[restored]]"));
|
||||
}
|
||||
|
@ -164,12 +180,12 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
|
|||
state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
|
||||
}
|
||||
Index[] indices = new Index[]{state.metaData().index("snapshotted").getIndex()};
|
||||
MetaDataIndexStateService.addIndexClosedBlocks(indices, state, new HashSet<>());
|
||||
MetaDataIndexStateService.addIndexClosedBlocks(indices, unmodifiableMap(emptyMap()), state);
|
||||
});
|
||||
assertThat(exception.getMessage(), containsString("Cannot close indices that are being snapshotted: [[snapshotted]]"));
|
||||
}
|
||||
{
|
||||
final Set<Index> blockedIndices = new HashSet<>();
|
||||
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
|
||||
ClusterState state = addOpenedIndex("index-1", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
|
||||
state = addOpenedIndex("index-2", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
|
||||
state = addOpenedIndex("index-3", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
|
||||
|
@ -177,30 +193,51 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
|
|||
if (mixedVersions) {
|
||||
state = ClusterState.builder(state)
|
||||
.nodes(DiscoveryNodes.builder(state.nodes())
|
||||
.add(new DiscoveryNode("old_node", buildNewFakeTransportAddress(), Collections.emptyMap(),
|
||||
.add(new DiscoveryNode("old_node", buildNewFakeTransportAddress(), emptyMap(),
|
||||
new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())), Version.V_6_0_0)))
|
||||
.build();
|
||||
}
|
||||
Index[] indices = new Index[]{state.metaData().index("index-1").getIndex(),
|
||||
state.metaData().index("index-2").getIndex(), state.metaData().index("index-3").getIndex()};
|
||||
|
||||
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices);
|
||||
Index index1 = state.metaData().index("index-1").getIndex();
|
||||
Index index2 = state.metaData().index("index-2").getIndex();
|
||||
Index index3 = state.metaData().index("index-3").getIndex();
|
||||
Index[] indices = new Index[]{index1, index2, index3};
|
||||
|
||||
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state);
|
||||
assertNotSame(state, updatedState);
|
||||
assertTrue(blockedIndices.contains(updatedState.metaData().index("index-1").getIndex()));
|
||||
assertTrue(blockedIndices.contains(updatedState.metaData().index("index-2").getIndex()));
|
||||
assertTrue(blockedIndices.contains(updatedState.metaData().index("index-3").getIndex()));
|
||||
if (mixedVersions) {
|
||||
assertIsClosed("index-1", updatedState);
|
||||
assertIsClosed("index-2", updatedState);
|
||||
assertIsClosed("index-2", updatedState);
|
||||
} else {
|
||||
assertIsBlocked("index-1", updatedState, true);
|
||||
assertIsBlocked("index-2", updatedState, true);
|
||||
assertIsBlocked("index-3", updatedState, true);
|
||||
|
||||
for (Index index : indices) {
|
||||
assertTrue(blockedIndices.containsKey(index));
|
||||
if (mixedVersions) {
|
||||
assertIsClosed(index.getName(), updatedState);
|
||||
} else {
|
||||
assertHasBlock(index.getName(), updatedState, blockedIndices.get(index));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testAddIndexClosedBlocksReusesBlocks() {
|
||||
ClusterState state = ClusterState.builder(new ClusterName("testAddIndexClosedBlocksReuseBlocks")).build();
|
||||
state = addOpenedIndex("test", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
|
||||
|
||||
Index test = state.metaData().index("test").getIndex();
|
||||
Index[] indices = new Index[]{test};
|
||||
|
||||
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
|
||||
state = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state);
|
||||
|
||||
assertTrue(blockedIndices.containsKey(test));
|
||||
assertHasBlock(test.getName(), state, blockedIndices.get(test));
|
||||
|
||||
final Map<Index, ClusterBlock> blockedIndices2 = new HashMap<>();
|
||||
state = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices2, state);
|
||||
|
||||
assertTrue(blockedIndices2.containsKey(test));
|
||||
assertHasBlock(test.getName(), state, blockedIndices2.get(test));
|
||||
assertEquals(blockedIndices.get(test), blockedIndices2.get(test));
|
||||
}
|
||||
|
||||
public void testValidateShardLimit() {
|
||||
int nodesInCluster = randomIntBetween(2,100);
|
||||
ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
|
||||
|
@ -251,11 +288,12 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
|
|||
}
|
||||
|
||||
private static ClusterState addClosedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
|
||||
return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.CLOSE, MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
|
||||
return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.CLOSE, INDEX_CLOSED_BLOCK);
|
||||
}
|
||||
|
||||
private static ClusterState addBlockedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
|
||||
return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.OPEN, MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
|
||||
private static ClusterState addBlockedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state,
|
||||
final ClusterBlock closingBlock) {
|
||||
return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.OPEN, closingBlock);
|
||||
}
|
||||
|
||||
private static ClusterState addRestoredIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
|
||||
|
@ -329,16 +367,21 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
|
|||
private static void assertIsOpened(final String indexName, final ClusterState clusterState) {
|
||||
assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.OPEN));
|
||||
assertThat(clusterState.routingTable().index(indexName), notNullValue());
|
||||
assertIsBlocked(indexName, clusterState, false);
|
||||
}
|
||||
|
||||
private static void assertIsClosed(final String indexName, final ClusterState clusterState) {
|
||||
assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.CLOSE));
|
||||
assertThat(clusterState.routingTable().index(indexName), nullValue());
|
||||
assertIsBlocked(indexName, clusterState, true);
|
||||
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true));
|
||||
assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]",
|
||||
clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream()
|
||||
.filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));
|
||||
}
|
||||
|
||||
private static void assertIsBlocked(final String indexName, final ClusterState clusterState, final boolean blocked) {
|
||||
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(blocked));
|
||||
private static void assertHasBlock(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) {
|
||||
assertThat(clusterState.blocks().hasIndexBlock(indexName, closingBlock), is(true));
|
||||
assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]",
|
||||
clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream()
|
||||
.filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,10 +20,10 @@ package org.elasticsearch.cluster.metadata;
|
|||
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class MetaDataIndexStateServiceUtils {
|
||||
|
||||
|
@ -31,16 +31,19 @@ public class MetaDataIndexStateServiceUtils {
|
|||
}
|
||||
|
||||
/**
|
||||
* Allows to call {@link MetaDataIndexStateService#addIndexClosedBlocks(Index[], ClusterState, Set)} which is a protected method.
|
||||
* Allows to call {@link MetaDataIndexStateService#addIndexClosedBlocks(Index[], Map, ClusterState)} which is a protected method.
|
||||
*/
|
||||
public static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterState state, final Set<Index> blockedIndices) {
|
||||
return MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices);
|
||||
public static ClusterState addIndexClosedBlocks(final Index[] indices, final Map<Index, ClusterBlock> blockedIndices,
|
||||
final ClusterState state) {
|
||||
return MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Allows to call {@link MetaDataIndexStateService#closeRoutingTable(ClusterState, Map)} which is a protected method.
|
||||
* Allows to call {@link MetaDataIndexStateService#closeRoutingTable(ClusterState, Map, Map)} which is a protected method.
|
||||
*/
|
||||
public static ClusterState closeRoutingTable(final ClusterState state, final Map<Index, AcknowledgedResponse> results) {
|
||||
return MetaDataIndexStateService.closeRoutingTable(state, results);
|
||||
public static ClusterState closeRoutingTable(final ClusterState state,
|
||||
final Map<Index, ClusterBlock> blockedIndices,
|
||||
final Map<Index, AcknowledgedResponse> results) {
|
||||
return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,6 +52,7 @@ import org.elasticsearch.cluster.EmptyClusterInfoService;
|
|||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
|
||||
import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor;
|
||||
import org.elasticsearch.cluster.metadata.AliasValidator;
|
||||
|
@ -94,9 +95,10 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
|
@ -222,10 +224,10 @@ public class ClusterStateChanges {
|
|||
final Index[] concreteIndices = Arrays.stream(request.indices())
|
||||
.map(index -> state.metaData().index(index).getIndex()).toArray(Index[]::new);
|
||||
|
||||
final Set<Index> blockedIndices = new HashSet<>();
|
||||
ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, state, blockedIndices);
|
||||
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
|
||||
ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, blockedIndices, state);
|
||||
|
||||
newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices.stream()
|
||||
newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices, blockedIndices.keySet().stream()
|
||||
.collect(Collectors.toMap(Function.identity(), index -> new AcknowledgedResponse(true))));
|
||||
return allocationService.reroute(newState, "indices closed");
|
||||
}
|
||||
|
|
|
@ -39,12 +39,12 @@ import java.util.Locale;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasItem;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
@ -135,6 +135,7 @@ public class CloseIndexIT extends ESIntegTestCase {
|
|||
final int nbDocs = randomIntBetween(10, 50);
|
||||
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs)
|
||||
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
|
||||
ensureYellowAndNoInitializingShards(indexName);
|
||||
|
||||
final CountDownLatch startClosing = new CountDownLatch(1);
|
||||
final Thread[] threads = new Thread[randomIntBetween(2, 5)];
|
||||
|
@ -146,7 +147,11 @@ public class CloseIndexIT extends ESIntegTestCase {
|
|||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
assertAcked(client().admin().indices().prepareClose(indexName));
|
||||
try {
|
||||
client().admin().indices().prepareClose(indexName).get();
|
||||
} catch (final Exception e) {
|
||||
assertException(e, indexName);
|
||||
}
|
||||
});
|
||||
threads[i].start();
|
||||
}
|
||||
|
@ -238,18 +243,84 @@ public class CloseIndexIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
static void assertIndexIsClosed(final String indexName) {
|
||||
public void testConcurrentClosesAndOpens() throws Exception {
|
||||
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
createIndex(indexName);
|
||||
|
||||
final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client());
|
||||
waitForDocs(1, indexer);
|
||||
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
final Runnable waitForLatch = () -> {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (final InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
};
|
||||
|
||||
final List<Thread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < randomIntBetween(1, 3); i++) {
|
||||
threads.add(new Thread(() -> {
|
||||
try {
|
||||
waitForLatch.run();
|
||||
client().admin().indices().prepareClose(indexName).get();
|
||||
} catch (final Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
for (int i = 0; i < randomIntBetween(1, 3); i++) {
|
||||
threads.add(new Thread(() -> {
|
||||
try {
|
||||
waitForLatch.run();
|
||||
assertAcked(client().admin().indices().prepareOpen(indexName).get());
|
||||
} catch (final Exception e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
}
|
||||
latch.countDown();
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
|
||||
indexer.setAssertNoFailuresOnStop(false);
|
||||
indexer.stop();
|
||||
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.CLOSE));
|
||||
assertThat(clusterState.routingTable().index(indexName), nullValue());
|
||||
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true));
|
||||
if (clusterState.metaData().indices().get(indexName).getState() == IndexMetaData.State.CLOSE) {
|
||||
assertIndexIsClosed(indexName);
|
||||
assertAcked(client().admin().indices().prepareOpen(indexName));
|
||||
}
|
||||
refresh(indexName);
|
||||
assertIndexIsOpened(indexName);
|
||||
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexer.totalIndexedDocs());
|
||||
}
|
||||
|
||||
static void assertIndexIsOpened(final String indexName) {
|
||||
static void assertIndexIsClosed(final String... indices) {
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN));
|
||||
assertThat(clusterState.routingTable().index(indexName), notNullValue());
|
||||
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(false));
|
||||
for (String index : indices) {
|
||||
assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.CLOSE));
|
||||
assertThat(clusterState.routingTable().index(index), nullValue());
|
||||
assertThat(clusterState.blocks().hasIndexBlock(index, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true));
|
||||
assertThat("Index " + index + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]",
|
||||
clusterState.blocks().indices().getOrDefault(index, emptySet()).stream()
|
||||
.filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));
|
||||
}
|
||||
}
|
||||
|
||||
static void assertIndexIsOpened(final String... indices) {
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
for (String index : indices) {
|
||||
assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.OPEN));
|
||||
assertThat(clusterState.routingTable().index(index), notNullValue());
|
||||
assertThat(clusterState.blocks().hasIndexBlock(index, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(false));
|
||||
}
|
||||
}
|
||||
|
||||
static void assertException(final Throwable throwable, final String indexName) {
|
||||
|
@ -257,7 +328,7 @@ public class CloseIndexIT extends ESIntegTestCase {
|
|||
if (t instanceof ClusterBlockException) {
|
||||
ClusterBlockException clusterBlockException = (ClusterBlockException) t;
|
||||
assertThat(clusterBlockException.blocks(), hasSize(1));
|
||||
assertThat(clusterBlockException.blocks(), hasItem(MetaDataIndexStateService.INDEX_CLOSED_BLOCK));
|
||||
assertTrue(clusterBlockException.blocks().stream().allMatch(b -> b.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID));
|
||||
} else if (t instanceof IndexClosedException) {
|
||||
IndexClosedException indexClosedException = (IndexClosedException) t;
|
||||
assertThat(indexClosedException.getIndex(), notNullValue());
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.indices.state;
|
|||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.search.SearchResponse;
|
||||
|
@ -46,6 +45,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_RE
|
|||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
|
||||
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed;
|
||||
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
|
@ -53,7 +54,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
|
|||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
public class OpenCloseIndexIT extends ESIntegTestCase {
|
||||
public void testSimpleCloseOpen() {
|
||||
|
@ -258,23 +258,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
|
|||
ensureGreen("test");
|
||||
}
|
||||
|
||||
private void assertIndexIsOpened(String... indices) {
|
||||
checkIndexState(IndexMetaData.State.OPEN, indices);
|
||||
}
|
||||
|
||||
private void assertIndexIsClosed(String... indices) {
|
||||
checkIndexState(IndexMetaData.State.CLOSE, indices);
|
||||
}
|
||||
|
||||
private void checkIndexState(IndexMetaData.State expectedState, String... indices) {
|
||||
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().execute().actionGet();
|
||||
for (String index : indices) {
|
||||
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get(index);
|
||||
assertThat(indexMetaData, notNullValue());
|
||||
assertThat(indexMetaData.getState(), equalTo(expectedState));
|
||||
}
|
||||
}
|
||||
|
||||
public void testOpenCloseWithDocs() throws IOException, ExecutionException, InterruptedException {
|
||||
String mapping = Strings.toString(XContentFactory.jsonBuilder().
|
||||
startObject().
|
||||
|
|
|
@ -0,0 +1,167 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.indices.state;
|
||||
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.common.Glob;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.util.concurrent.RunOnce;
|
||||
import org.elasticsearch.plugins.Plugin;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
import org.elasticsearch.test.transport.MockTransportService;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID;
|
||||
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed;
|
||||
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 2)
|
||||
public class ReopenWhileClosingIT extends ESIntegTestCase {
|
||||
|
||||
@Override
|
||||
protected Collection<Class<? extends Plugin>> nodePlugins() {
|
||||
return singletonList(MockTransportService.TestPlugin.class);
|
||||
}
|
||||
|
||||
public void testReopenDuringClose() throws Exception {
|
||||
final String indexName = "test";
|
||||
createIndexWithDocs(indexName);
|
||||
|
||||
ensureYellowAndNoInitializingShards(indexName);
|
||||
|
||||
final CountDownLatch block = new CountDownLatch(1);
|
||||
final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(indexName, block::countDown);
|
||||
|
||||
ActionFuture<AcknowledgedResponse> closeIndexResponse = client().admin().indices().prepareClose(indexName).execute();
|
||||
assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS));
|
||||
assertIndexIsBlocked(indexName);
|
||||
assertFalse(closeIndexResponse.isDone());
|
||||
|
||||
assertAcked(client().admin().indices().prepareOpen(indexName));
|
||||
|
||||
releaseBlock.close();
|
||||
assertFalse(closeIndexResponse.get().isAcknowledged());
|
||||
assertIndexIsOpened(indexName);
|
||||
}
|
||||
|
||||
public void testReopenDuringCloseOnMultipleIndices() throws Exception {
|
||||
final List<String> indices = new ArrayList<>();
|
||||
for (int i = 0; i < randomIntBetween(2, 10); i++) {
|
||||
indices.add("index-" + i);
|
||||
createIndexWithDocs(indices.get(i));
|
||||
}
|
||||
|
||||
ensureYellowAndNoInitializingShards(indices.toArray(Strings.EMPTY_ARRAY));
|
||||
|
||||
final CountDownLatch block = new CountDownLatch(1);
|
||||
final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(randomFrom(indices), block::countDown);
|
||||
|
||||
ActionFuture<AcknowledgedResponse> closeIndexResponse = client().admin().indices().prepareClose("index-*").execute();
|
||||
assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS));
|
||||
assertFalse(closeIndexResponse.isDone());
|
||||
indices.forEach(ReopenWhileClosingIT::assertIndexIsBlocked);
|
||||
|
||||
final List<String> reopenedIndices = randomSubsetOf(randomIntBetween(1, indices.size()), indices);
|
||||
assertAcked(client().admin().indices().prepareOpen(reopenedIndices.toArray(Strings.EMPTY_ARRAY)));
|
||||
|
||||
releaseBlock.close();
|
||||
assertFalse(closeIndexResponse.get().isAcknowledged());
|
||||
|
||||
indices.forEach(index -> {
|
||||
if (reopenedIndices.contains(index)) {
|
||||
assertIndexIsOpened(index);
|
||||
} else {
|
||||
assertIndexIsClosed(index);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void createIndexWithDocs(final String indexName) {
|
||||
createIndex(indexName);
|
||||
final int nbDocs = scaledRandomIntBetween(1, 100);
|
||||
for (int i = 0; i < nbDocs; i++) {
|
||||
index(indexName, "_doc", String.valueOf(i), "num", i);
|
||||
}
|
||||
assertIndexIsOpened(indexName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Intercepts and blocks the {@link TransportVerifyShardBeforeCloseAction} executed for the given index pattern.
|
||||
*/
|
||||
private Releasable interceptVerifyShardBeforeCloseActions(final String indexPattern, final Runnable onIntercept) {
|
||||
final MockTransportService mockTransportService = (MockTransportService) internalCluster()
|
||||
.getInstance(TransportService.class, internalCluster().getMasterName());
|
||||
|
||||
final CountDownLatch release = new CountDownLatch(1);
|
||||
for (DiscoveryNode node : internalCluster().clusterService().state().getNodes()) {
|
||||
mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, node.getName()),
|
||||
(connection, requestId, action, request, options) -> {
|
||||
if (action.startsWith(TransportVerifyShardBeforeCloseAction.NAME)) {
|
||||
if (request instanceof TransportVerifyShardBeforeCloseAction.ShardRequest) {
|
||||
final String index = ((TransportVerifyShardBeforeCloseAction.ShardRequest) request).shardId().getIndexName();
|
||||
if (Glob.globMatch(indexPattern, index)) {
|
||||
logger.info("request {} intercepted for index {}", requestId, index);
|
||||
onIntercept.run();
|
||||
try {
|
||||
release.await();
|
||||
logger.info("request {} released for index {}", requestId, index);
|
||||
} catch (final InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
connection.sendRequest(requestId, action, request, options);
|
||||
});
|
||||
}
|
||||
final RunOnce releaseOnce = new RunOnce(release::countDown);
|
||||
return releaseOnce::run;
|
||||
}
|
||||
|
||||
private static void assertIndexIsBlocked(final String... indices) {
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
for (String index : indices) {
|
||||
assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.OPEN));
|
||||
assertThat(clusterState.routingTable().index(index), notNullValue());
|
||||
assertThat("Index " + index + " must have only 1 block with [id=" + INDEX_CLOSED_BLOCK_ID + "]",
|
||||
clusterState.blocks().indices().getOrDefault(index, emptySet()).stream()
|
||||
.filter(clusterBlock -> clusterBlock.id() == INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -544,7 +544,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
|||
logger.info("--> start snapshot with default settings and closed index - should be blocked");
|
||||
assertBlocked(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
|
||||
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
|
||||
.setWaitForCompletion(true), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
|
||||
.setWaitForCompletion(true), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID);
|
||||
|
||||
|
||||
logger.info("--> start snapshot with default settings without a closed index - should fail");
|
||||
|
|
|
@ -1562,7 +1562,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
|
||||
logger.info("--> snapshot with closed index");
|
||||
assertBlocked(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true)
|
||||
.setIndices("test-idx", "test-idx-closed"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
|
||||
.setIndices("test-idx", "test-idx-closed"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID);
|
||||
}
|
||||
|
||||
public void testSnapshotSingleClosedIndex() throws Exception {
|
||||
|
@ -1580,7 +1580,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
|
||||
logger.info("--> snapshot");
|
||||
assertBlocked(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
|
||||
.setWaitForCompletion(true).setIndices("test-idx"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
|
||||
.setWaitForCompletion(true).setIndices("test-idx"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID);
|
||||
}
|
||||
|
||||
public void testRenameOnRestore() throws Exception {
|
||||
|
|
|
@ -129,7 +129,7 @@ public class ElasticsearchAssertions {
|
|||
* @param builder the request builder
|
||||
*/
|
||||
public static void assertBlocked(ActionRequestBuilder builder) {
|
||||
assertBlocked(builder, null);
|
||||
assertBlocked(builder, (ClusterBlock) null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -155,9 +155,9 @@ public class ElasticsearchAssertions {
|
|||
* Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}.
|
||||
*
|
||||
* @param builder the request builder
|
||||
* @param expectedBlock the expected block
|
||||
* @param expectedBlockId the expected block id
|
||||
*/
|
||||
public static void assertBlocked(ActionRequestBuilder builder, ClusterBlock expectedBlock) {
|
||||
public static void assertBlocked(final ActionRequestBuilder builder, @Nullable final Integer expectedBlockId) {
|
||||
try {
|
||||
builder.get();
|
||||
fail("Request executed with success but a ClusterBlockException was expected");
|
||||
|
@ -165,19 +165,29 @@ public class ElasticsearchAssertions {
|
|||
assertThat(e.blocks().size(), greaterThan(0));
|
||||
assertThat(e.status(), equalTo(RestStatus.FORBIDDEN));
|
||||
|
||||
if (expectedBlock != null) {
|
||||
if (expectedBlockId != null) {
|
||||
boolean found = false;
|
||||
for (ClusterBlock clusterBlock : e.blocks()) {
|
||||
if (clusterBlock.id() == expectedBlock.id()) {
|
||||
if (clusterBlock.id() == expectedBlockId) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertThat("Request should have been blocked by [" + expectedBlock + "] instead of " + e.blocks(), found, equalTo(true));
|
||||
assertThat("Request should have been blocked by [" + expectedBlockId + "] instead of " + e.blocks(), found, equalTo(true));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}.
|
||||
*
|
||||
* @param builder the request builder
|
||||
* @param expectedBlock the expected block
|
||||
*/
|
||||
public static void assertBlocked(final ActionRequestBuilder builder, @Nullable final ClusterBlock expectedBlock) {
|
||||
assertBlocked(builder, expectedBlock != null ? expectedBlock.id() : null);
|
||||
}
|
||||
|
||||
public static String formatShardStatus(BroadcastResponse response) {
|
||||
StringBuilder msg = new StringBuilder();
|
||||
msg.append(" Total shards: ").append(response.getTotalShards())
|
||||
|
|
Loading…
Reference in New Issue