Merge branch 'close-index-api-refactoring'

This commit is contained in:
Tanguy Leroux 2019-01-09 11:48:57 +01:00
commit f1f5d834c3
23 changed files with 2280 additions and 280 deletions

View File

@ -22,10 +22,10 @@ package org.elasticsearch.test.rest;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.Request;
import org.junit.After;
import org.junit.Before;

View File

@ -25,7 +25,13 @@ import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
*/
public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> {
CloseIndexClusterStateUpdateRequest() {
private final long taskId;
public CloseIndexClusterStateUpdateRequest(final long taskId) {
this.taskId = taskId;
}
public long taskId() {
return taskId;
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -100,24 +99,32 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIn
@Override
protected void masterOperation(final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}
@Override
protected void masterOperation(final Task task, final CloseIndexRequest request, final ClusterState state,
final ActionListener<AcknowledgedResponse> listener) throws Exception {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
if (concreteIndices == null || concreteIndices.length == 0) {
listener.onResponse(new AcknowledgedResponse(true));
return;
}
CloseIndexClusterStateUpdateRequest updateRequest = new CloseIndexClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);
indexStateService.closeIndices(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
indexStateService.closeIndices(closeRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
public void onResponse(final AcknowledgedResponse response) {
listener.onResponse(response);
}
@Override
public void onFailure(Exception t) {
public void onFailure(final Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t);
}

View File

@ -0,0 +1,176 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.close;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import java.io.IOException;
import java.util.Objects;
import java.util.function.Consumer;
public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {
public static final String NAME = CloseIndexAction.NAME + "[s]";
@Inject
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
final ClusterService clusterService, final IndicesService indicesService,
final ThreadPool threadPool, final ShardStateAction stateAction,
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
}
@Override
protected ReplicationResponse newResponseInstance() {
return new ReplicationResponse();
}
@Override
protected void acquirePrimaryOperationPermit(final IndexShard primary,
final ShardRequest request,
final ActionListener<Releasable> onAcquired) {
primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout());
}
@Override
protected void acquireReplicaOperationPermit(final IndexShard replica,
final ShardRequest request,
final ActionListener<Releasable> onAcquired,
final long primaryTerm,
final long globalCheckpoint,
final long maxSeqNoOfUpdateOrDeletes) {
replica.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNoOfUpdateOrDeletes, onAcquired, request.timeout());
}
@Override
protected PrimaryResult<ShardRequest, ReplicationResponse> shardOperationOnPrimary(final ShardRequest shardRequest,
final IndexShard primary) throws Exception {
executeShardOperation(shardRequest, primary);
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
}
@Override
protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception {
executeShardOperation(shardRequest, replica);
return new ReplicaResult();
}
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
final ShardId shardId = indexShard.shardId();
if (indexShard.getActiveOperationsCount() != 0) {
throw new IllegalStateException("On-going operations in progress while checking index shard " + shardId + " before closing");
}
final ClusterBlocks clusterBlocks = clusterService.state().blocks();
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
throw new IllegalStateException("Index shard " + shardId + " must be blocked by " + request.clusterBlock() + " before closing");
}
final long maxSeqNo = indexShard.seqNoStats().getMaxSeqNo();
if (indexShard.getGlobalCheckpoint() != maxSeqNo) {
throw new IllegalStateException("Global checkpoint [" + indexShard.getGlobalCheckpoint()
+ "] mismatches maximum sequence number [" + maxSeqNo + "] on index shard " + shardId);
}
indexShard.flush(new FlushRequest());
logger.debug("{} shard is ready for closing", shardId);
}
@Override
protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy(final long primaryTerm) {
return new VerifyShardBeforeCloseActionReplicasProxy(primaryTerm);
}
/**
* A {@link ReplicasProxy} that marks as stale the shards that are unavailable during the verification
* and the flush of the shard. This is done to ensure that such shards won't be later promoted as primary
* or reopened in an unverified state with potential non flushed translog operations.
*/
class VerifyShardBeforeCloseActionReplicasProxy extends ReplicasProxy {
VerifyShardBeforeCloseActionReplicasProxy(final long primaryTerm) {
super(primaryTerm);
}
@Override
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final Runnable onSuccess,
final Consumer<Exception> onPrimaryDemoted, final Consumer<Exception> onIgnoredFailure) {
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null,
createShardActionListener(onSuccess, onPrimaryDemoted, onIgnoredFailure));
}
}
public static class ShardRequest extends ReplicationRequest<ShardRequest> {
private ClusterBlock clusterBlock;
ShardRequest(){
}
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
super(shardId);
this.clusterBlock = Objects.requireNonNull(clusterBlock);
setParentTask(parentTaskId);
}
@Override
public String toString() {
return "verify shard " + shardId + " before close with block " + clusterBlock;
}
@Override
public void readFrom(final StreamInput in) throws IOException {
super.readFrom(in);
clusterBlock = ClusterBlock.readClusterBlock(in);
}
@Override
public void writeTo(final StreamOutput out) throws IOException {
super.writeTo(out);
clusterBlock.writeTo(out);
}
public ClusterBlock clusterBlock() {
return clusterBlock;
}
}
}

View File

@ -19,6 +19,8 @@
package org.elasticsearch.cluster.block;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
@ -30,29 +32,31 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.Locale;
import java.util.Objects;
public class ClusterBlock implements Streamable, ToXContentFragment {
private int id;
private @Nullable String uuid;
private String description;
private EnumSet<ClusterBlockLevel> levels;
private boolean retryable;
private boolean disableStatePersistence = false;
private boolean allowReleaseResources;
private RestStatus status;
ClusterBlock() {
private ClusterBlock() {
}
public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence, boolean allowReleaseResources,
RestStatus status, EnumSet<ClusterBlockLevel> levels) {
public ClusterBlock(int id, String description, boolean retryable, boolean disableStatePersistence,
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
this(id, null, description, retryable, disableStatePersistence, allowReleaseResources, status, levels);
}
public ClusterBlock(int id, String uuid, String description, boolean retryable, boolean disableStatePersistence,
boolean allowReleaseResources, RestStatus status, EnumSet<ClusterBlockLevel> levels) {
this.id = id;
this.uuid = uuid;
this.description = description;
this.retryable = retryable;
this.disableStatePersistence = disableStatePersistence;
@ -65,6 +69,10 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
return this.id;
}
public String uuid() {
return uuid;
}
public String description() {
return this.description;
}
@ -104,6 +112,9 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(Integer.toString(id));
if (uuid != null) {
builder.field("uuid", uuid);
}
builder.field("description", description);
builder.field("retryable", retryable);
if (disableStatePersistence) {
@ -127,6 +138,11 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
@Override
public void readFrom(StreamInput in) throws IOException {
id = in.readVInt();
if (in.getVersion().onOrAfter(Version.V_7_0_0)) {
uuid = in.readOptionalString();
} else {
uuid = null;
}
description = in.readString();
final int len = in.readVInt();
ArrayList<ClusterBlockLevel> levels = new ArrayList<>(len);
@ -143,6 +159,9 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(id);
if (out.getVersion().onOrAfter(Version.V_7_0_0)) {
out.writeOptionalString(uuid);
}
out.writeString(description);
out.writeVInt(levels.size());
for (ClusterBlockLevel level : levels) {
@ -157,7 +176,11 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(id).append(",").append(description).append(", blocks ");
sb.append(id).append(",");
if (uuid != null) {
sb.append(uuid).append(',');
}
sb.append(description).append(", blocks ");
String delimiter = "";
for (ClusterBlockLevel level : levels) {
sb.append(delimiter).append(level.name());
@ -168,19 +191,19 @@ public class ClusterBlock implements Streamable, ToXContentFragment {
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ClusterBlock that = (ClusterBlock) o;
if (id != that.id) return false;
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ClusterBlock that = (ClusterBlock) o;
return id == that.id && Objects.equals(uuid, that.uuid);
}
@Override
public int hashCode() {
return id;
return Objects.hash(id, uuid);
}
public boolean isAllowReleaseResources() {

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -147,6 +148,31 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
return indicesBlocks.containsKey(index) && indicesBlocks.get(index).contains(block);
}
public boolean hasIndexBlockWithId(String index, int blockId) {
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
if (clusterBlocks != null) {
for (ClusterBlock clusterBlock : clusterBlocks) {
if (clusterBlock.id() == blockId) {
return true;
}
}
}
return false;
}
@Nullable
public ClusterBlock getIndexBlockWithId(final String index, final int blockId) {
final Set<ClusterBlock> clusterBlocks = indicesBlocks.get(index);
if (clusterBlocks != null) {
for (ClusterBlock clusterBlock : clusterBlocks) {
if (clusterBlock.id() == blockId) {
return clusterBlock;
}
}
}
return null;
}
public void globalBlockedRaiseException(ClusterBlockLevel level) throws ClusterBlockException {
ClusterBlockException blockException = globalBlockedException(level);
if (blockException != null) {
@ -403,6 +429,18 @@ public class ClusterBlocks extends AbstractDiffable<ClusterBlocks> {
return this;
}
public Builder removeIndexBlockWithId(String index, int blockId) {
final Set<ClusterBlock> indexBlocks = indices.get(index);
if (indexBlocks == null) {
return this;
}
indexBlocks.removeIf(block -> block.id() == blockId);
if (indexBlocks.isEmpty()) {
indices.remove(index);
}
return this;
}
public ClusterBlocks build() {
// We copy the block sets here in case of the builder is modified after build is called
ImmutableOpenMap.Builder<String, Set<ClusterBlock>> indicesBuilder = ImmutableOpenMap.builder(indices.size());

View File

@ -19,40 +19,67 @@
package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableMap;
/**
* Service responsible for submitting open/close index requests
@ -60,54 +87,130 @@ import java.util.Set;
public class MetaDataIndexStateService {
private static final Logger logger = LogManager.getLogger(MetaDataIndexStateService.class);
public static final int INDEX_CLOSED_BLOCK_ID = 4;
public static final ClusterBlock INDEX_CLOSED_BLOCK = new ClusterBlock(4, "index closed", false,
false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
private final ClusterService clusterService;
private final AllocationService allocationService;
private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
private final IndicesService indicesService;
private final ThreadPool threadPool;
private final TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction;
private final ActiveShardsObserver activeShardsObserver;
@Inject
public MetaDataIndexStateService(ClusterService clusterService, AllocationService allocationService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService,
IndicesService indicesService, ThreadPool threadPool) {
IndicesService indicesService, ThreadPool threadPool,
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction) {
this.indicesService = indicesService;
this.clusterService = clusterService;
this.allocationService = allocationService;
this.threadPool = threadPool;
this.transportVerifyShardBeforeCloseAction = transportVerifyShardBeforeCloseAction;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
}
public void closeIndices(final CloseIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) {
if (request.indices() == null || request.indices().length == 0) {
/**
* Closes one or more indices.
*
* Closing indices is a 3 steps process: it first adds a write block to every indices to close, then waits for the operations on shards
* to be terminated and finally closes the indices by moving their state to CLOSE.
*/
public void closeIndices(final CloseIndexClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
final Index[] concreteIndices = request.indices();
if (concreteIndices == null || concreteIndices.length == 0) {
throw new IllegalArgumentException("Index name is required");
}
final String indicesAsString = Arrays.toString(request.indices());
clusterService.submitStateUpdateTask("close-indices " + indicesAsString,
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT) {
private final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
@Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) {
return new ClusterStateUpdateResponse(acknowledged);
public ClusterState execute(final ClusterState currentState) {
return addIndexClosedBlocks(concreteIndices, blockedIndices, currentState);
}
@Override
public ClusterState execute(ClusterState currentState) {
return closeIndices(currentState, request.indices(), indicesAsString);
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
if (oldState == newState) {
assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed";
listener.onResponse(new AcknowledgedResponse(true));
} else {
assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed";
threadPool.executor(ThreadPool.Names.MANAGEMENT)
.execute(new WaitForClosedBlocksApplied(blockedIndices, request,
ActionListener.wrap(results ->
clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) {
boolean acknowledged = true;
@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
final ClusterState updatedState = closeRoutingTable(currentState, blockedIndices, results);
for (Map.Entry<Index, AcknowledgedResponse> result : results.entrySet()) {
IndexMetaData updatedMetaData = updatedState.metaData().index(result.getKey());
if (updatedMetaData != null && updatedMetaData.getState() != IndexMetaData.State.CLOSE) {
acknowledged = false;
break;
}
});
}
return allocationService.reroute(updatedState, "indices closed");
}
public ClusterState closeIndices(ClusterState currentState, final Index[] indices, String indicesAsString) {
Set<IndexMetaData> indicesToClose = new HashSet<>();
@Override
public void onFailure(final String source, final Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(final String source,
final ClusterState oldState, final ClusterState newState) {
listener.onResponse(new AcknowledgedResponse(acknowledged));
}
}),
listener::onFailure)
)
);
}
}
@Override
public void onFailure(final String source, final Exception e) {
listener.onFailure(e);
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
}
}
);
}
/**
* Step 1 - Start closing indices by adding a write block
*
* This step builds the list of indices to close (the ones explicitly requested that are not in CLOSE state) and adds a unique cluster
* block (or reuses an existing one) to every index to close in the cluster state. After the cluster state is published, the shards
* should start to reject writing operations and we can proceed with step 2.
*/
static ClusterState addIndexClosedBlocks(final Index[] indices, final Map<Index, ClusterBlock> blockedIndices,
final ClusterState currentState) {
final MetaData.Builder metadata = MetaData.builder(currentState.metaData());
final Set<IndexMetaData> indicesToClose = new HashSet<>();
for (Index index : indices) {
final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index);
final IndexMetaData indexMetaData = metadata.getSafe(index);
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
indicesToClose.add(indexMetaData);
} else {
logger.debug("index {} is already closed, ignoring", index);
assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
}
}
@ -119,28 +222,204 @@ public class MetaDataIndexStateService {
RestoreService.checkIndexClosing(currentState, indicesToClose);
// Check if index closing conflicts with any running snapshots
SnapshotsService.checkIndexClosing(currentState, indicesToClose);
logger.info("closing indices [{}]", indicesAsString);
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
.blocks(currentState.blocks());
for (IndexMetaData openIndexMetadata : indicesToClose) {
final String indexName = openIndexMetadata.getIndex().getName();
mdBuilder.put(IndexMetaData.builder(openIndexMetadata).state(IndexMetaData.State.CLOSE));
blocksBuilder.addIndexBlock(indexName, INDEX_CLOSED_BLOCK);
// If the cluster is in a mixed version that does not support the shard close action,
// we use the previous way to close indices and directly close them without sanity checks
final boolean useDirectClose = currentState.nodes().getMinNodeVersion().before(Version.V_7_0_0);
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
for (IndexMetaData indexToClose : indicesToClose) {
final Index index = indexToClose.getIndex();
ClusterBlock indexBlock = null;
final Set<ClusterBlock> clusterBlocks = currentState.blocks().indices().get(index.getName());
if (clusterBlocks != null) {
for (ClusterBlock clusterBlock : clusterBlocks) {
if (clusterBlock.id() == INDEX_CLOSED_BLOCK_ID) {
// Reuse the existing index closed block
indexBlock = clusterBlock;
break;
}
}
}
if (useDirectClose) {
logger.debug("closing index {} directly", index);
metadata.put(IndexMetaData.builder(indexToClose).state(IndexMetaData.State.CLOSE)); // increment version?
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
routingTable.remove(index.getName());
indexBlock = INDEX_CLOSED_BLOCK;
} else {
if (indexBlock == null) {
// Create a new index closed block
indexBlock = createIndexClosingBlock();
}
assert Strings.hasLength(indexBlock.uuid()) : "Closing block should have a UUID";
}
blocks.addIndexBlock(index.getName(), indexBlock);
blockedIndices.put(index, indexBlock);
}
ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build();
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
for (IndexMetaData index : indicesToClose) {
rtBuilder.remove(index.getIndex().getName());
logger.info(() -> new ParameterizedMessage("closing indices {}",
blockedIndices.keySet().stream().map(Object::toString).collect(Collectors.joining(","))));
return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
}
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return allocationService.reroute(
ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(),
"indices closed [" + indicesAsString + "]");
/**
* Step 2 - Wait for indices to be ready for closing
* <p>
* This step iterates over the indices previously blocked and sends a {@link TransportVerifyShardBeforeCloseAction} to each shard. If
* this action succeed then the shard is considered to be ready for closing. When all shards of a given index are ready for closing,
* the index is considered ready to be closed.
*/
class WaitForClosedBlocksApplied extends AbstractRunnable {
private final Map<Index, ClusterBlock> blockedIndices;
private final CloseIndexClusterStateUpdateRequest request;
private final ActionListener<Map<Index, AcknowledgedResponse>> listener;
private WaitForClosedBlocksApplied(final Map<Index, ClusterBlock> blockedIndices,
final CloseIndexClusterStateUpdateRequest request,
final ActionListener<Map<Index, AcknowledgedResponse>> listener) {
if (blockedIndices == null || blockedIndices.isEmpty()) {
throw new IllegalArgumentException("Cannot wait for closed blocks to be applied, list of blocked indices is empty or null");
}
this.blockedIndices = blockedIndices;
this.request = request;
this.listener = listener;
}
@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
final Map<Index, AcknowledgedResponse> results = ConcurrentCollections.newConcurrentMap();
final CountDown countDown = new CountDown(blockedIndices.size());
final ClusterState state = clusterService.state();
blockedIndices.forEach((index, block) -> {
waitForShardsReadyForClosing(index, block, state, response -> {
results.put(index, response);
if (countDown.countDown()) {
listener.onResponse(unmodifiableMap(results));
}
});
});
}
private void waitForShardsReadyForClosing(final Index index, final ClusterBlock closingBlock,
final ClusterState state, final Consumer<AcknowledgedResponse> onResponse) {
final IndexMetaData indexMetaData = state.metaData().index(index);
if (indexMetaData == null) {
logger.debug("index {} has been blocked before closing and is now deleted, ignoring", index);
onResponse.accept(new AcknowledgedResponse(true));
return;
}
final IndexRoutingTable indexRoutingTable = state.routingTable().index(index);
if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) {
assert state.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
logger.debug("index {} has been blocked before closing and is already closed, ignoring", index);
onResponse.accept(new AcknowledgedResponse(true));
return;
}
final ImmutableOpenIntMap<IndexShardRoutingTable> shards = indexRoutingTable.getShards();
final AtomicArray<AcknowledgedResponse> results = new AtomicArray<>(shards.size());
final CountDown countDown = new CountDown(shards.size());
for (IntObjectCursor<IndexShardRoutingTable> shard : shards) {
final IndexShardRoutingTable shardRoutingTable = shard.value;
final ShardId shardId = shardRoutingTable.shardId();
sendVerifyShardBeforeCloseRequest(shardRoutingTable, closingBlock, new NotifyOnceListener<ReplicationResponse>() {
@Override
public void innerOnResponse(final ReplicationResponse replicationResponse) {
ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo();
results.setOnce(shardId.id(), new AcknowledgedResponse(shardInfo.getFailed() == 0));
processIfFinished();
}
@Override
public void innerOnFailure(final Exception e) {
results.setOnce(shardId.id(), new AcknowledgedResponse(false));
processIfFinished();
}
private void processIfFinished() {
if (countDown.countDown()) {
final boolean acknowledged = results.asList().stream().allMatch(AcknowledgedResponse::isAcknowledged);
onResponse.accept(new AcknowledgedResponse(acknowledged));
}
}
});
}
}
private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable,
final ClusterBlock closingBlock,
final ActionListener<ReplicationResponse> listener) {
final ShardId shardId = shardRoutingTable.shardId();
if (shardRoutingTable.primaryShard().unassigned()) {
logger.debug("primary shard {} is unassigned, ignoring", shardId);
final ReplicationResponse response = new ReplicationResponse();
response.setShardInfo(new ReplicationResponse.ShardInfo(shardRoutingTable.size(), shardRoutingTable.size()));
listener.onResponse(response);
return;
}
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId());
final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, closingBlock, parentTaskId);
if (request.ackTimeout() != null) {
shardRequest.timeout(request.ackTimeout());
}
transportVerifyShardBeforeCloseAction.execute(shardRequest, listener);
}
}
/**
* Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing.
*/
static ClusterState closeRoutingTable(final ClusterState currentState,
final Map<Index, ClusterBlock> blockedIndices,
final Map<Index, AcknowledgedResponse> results) {
final MetaData.Builder metadata = MetaData.builder(currentState.metaData());
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
final Set<String> closedIndices = new HashSet<>();
for (Map.Entry<Index, AcknowledgedResponse> result : results.entrySet()) {
final Index index = result.getKey();
final boolean acknowledged = result.getValue().isAcknowledged();
try {
if (acknowledged == false) {
logger.debug("verification of shards before closing {} failed", index);
continue;
}
final IndexMetaData indexMetaData = metadata.getSafe(index);
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
logger.debug("verification of shards before closing {} succeeded but index is already closed", index);
assert currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
continue;
}
final ClusterBlock closingBlock = blockedIndices.get(index);
if (currentState.blocks().hasIndexBlock(index.getName(), closingBlock) == false) {
logger.debug("verification of shards before closing {} succeeded but block has been removed in the meantime", index);
continue;
}
logger.debug("closing index {} succeeded", index);
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID).addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
routingTable.remove(index.getName());
closedIndices.add(index.getName());
} catch (final IndexNotFoundException e) {
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
}
}
logger.info("completed closing of indices {}", closedIndices);
return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
}
public void openIndex(final OpenIndexClusterStateUpdateRequest request,
@ -177,57 +456,66 @@ public class MetaDataIndexStateService {
}
@Override
public ClusterState execute(ClusterState currentState) {
List<IndexMetaData> indicesToOpen = new ArrayList<>();
for (Index index : request.indices()) {
public ClusterState execute(final ClusterState currentState) {
final ClusterState updatedState = openIndices(request.indices(), currentState);
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return allocationService.reroute(updatedState, "indices opened [" + indicesAsString + "]");
}
}
);
}
ClusterState openIndices(final Index[] indices, final ClusterState currentState) {
final List<IndexMetaData> indicesToOpen = new ArrayList<>();
for (Index index : indices) {
final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index);
if (indexMetaData.getState() != IndexMetaData.State.OPEN) {
indicesToOpen.add(indexMetaData);
} else if (currentState.blocks().hasIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID)) {
indicesToOpen.add(indexMetaData);
}
}
validateShardLimit(currentState, request.indices());
validateShardLimit(currentState, indices);
if (indicesToOpen.isEmpty()) {
return currentState;
}
logger.info("opening indices [{}]", indicesAsString);
logger.info(() -> new ParameterizedMessage("opening indices [{}]",
String.join(",", indicesToOpen.stream().map(i -> (CharSequence) i.getIndex().toString())::iterator)));
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder()
.blocks(currentState.blocks());
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion()
.minimumIndexCompatibilityVersion();
for (IndexMetaData closedMetaData : indicesToOpen) {
final String indexName = closedMetaData.getIndex().getName();
IndexMetaData indexMetaData = IndexMetaData.builder(closedMetaData).state(IndexMetaData.State.OPEN).build();
final MetaData.Builder metadata = MetaData.builder(currentState.metaData());
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
final Version minIndexCompatibilityVersion = currentState.getNodes().getMaxNodeVersion().minimumIndexCompatibilityVersion();
for (IndexMetaData indexMetaData : indicesToOpen) {
final Index index = indexMetaData.getIndex();
if (indexMetaData.getState() != IndexMetaData.State.OPEN) {
IndexMetaData updatedIndexMetaData = IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.OPEN).build();
// The index might be closed because we couldn't import it due to old incompatible version
// We need to check that this index can be upgraded to the current version
indexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(indexMetaData, minIndexCompatibilityVersion);
updatedIndexMetaData = metaDataIndexUpgradeService.upgradeIndexMetaData(updatedIndexMetaData, minIndexCompatibilityVersion);
try {
indicesService.verifyIndexMetadata(indexMetaData, indexMetaData);
indicesService.verifyIndexMetadata(updatedIndexMetaData, updatedIndexMetaData);
} catch (Exception e) {
throw new ElasticsearchException("Failed to verify index " + indexMetaData.getIndex(), e);
throw new ElasticsearchException("Failed to verify index " + index, e);
}
metadata.put(updatedIndexMetaData, true);
}
mdBuilder.put(indexMetaData, true);
blocksBuilder.removeIndexBlock(indexName, INDEX_CLOSED_BLOCK);
// Always removes index closed blocks (note: this can fail on-going close index actions)
blocks.removeIndexBlockWithId(index.getName(), INDEX_CLOSED_BLOCK_ID);
}
ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build();
ClusterState updatedState = ClusterState.builder(currentState).metaData(metadata).blocks(blocks).build();
RoutingTable.Builder rtBuilder = RoutingTable.builder(updatedState.routingTable());
for (IndexMetaData index : indicesToOpen) {
rtBuilder.addAsFromCloseToOpen(updatedState.metaData().getIndexSafe(index.getIndex()));
final RoutingTable.Builder routingTable = RoutingTable.builder(updatedState.routingTable());
for (IndexMetaData previousIndexMetaData : indicesToOpen) {
if (previousIndexMetaData.getState() != IndexMetaData.State.OPEN) {
routingTable.addAsFromCloseToOpen(updatedState.metaData().getIndexSafe(previousIndexMetaData.getIndex()));
}
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask
return allocationService.reroute(
ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(),
"indices opened [" + indicesAsString + "]");
}
});
return ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
}
/**
@ -250,7 +538,6 @@ public class MetaDataIndexStateService {
ex.addValidationError(error.get());
throw ex;
}
}
private static int getTotalShardCount(ClusterState state, Index index) {
@ -258,4 +545,14 @@ public class MetaDataIndexStateService {
return indexMetaData.getNumberOfShards() * (1 + indexMetaData.getNumberOfReplicas());
}
/**
* @return Generates a {@link ClusterBlock} that blocks read and write operations on soon-to-be-closed indices. The
* cluster block is generated with the id value equals to {@link #INDEX_CLOSED_BLOCK_ID} and a unique UUID.
*/
public static ClusterBlock createIndexClosingBlock() {
return new ClusterBlock(INDEX_CLOSED_BLOCK_ID, UUIDs.randomBase64UUID(), "index preparing to close. Reopen the index to allow " +
"writes again or retry closing the index to fully close the index.", false, false, false, RestStatus.FORBIDDEN,
EnumSet.of(ClusterBlockLevel.WRITE));
}
}

View File

@ -0,0 +1,330 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.indices.close;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.replication.ReplicationOperation;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.support.replication.TransportReplicationAction;
import org.elasticsearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.seqno.SeqNoStats;
import org.elasticsearch.index.seqno.SequenceNumbers;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.ReplicationGroup;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.transport.CapturingTransport;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportResponse;
import org.elasticsearch.transport.TransportService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state;
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
import static org.elasticsearch.test.ClusterServiceUtils.setState;
import static org.hamcrest.Matchers.arrayWithSize;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.instanceOf;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
private static ThreadPool threadPool;
private IndexShard indexShard;
private TransportVerifyShardBeforeCloseAction action;
private ClusterService clusterService;
private ClusterBlock clusterBlock;
private CapturingTransport transport;
@BeforeClass
public static void beforeClass() {
threadPool = new TestThreadPool(getTestClass().getName());
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
indexShard = mock(IndexShard.class);
when(indexShard.getActiveOperationsCount()).thenReturn(0);
when(indexShard.getGlobalCheckpoint()).thenReturn(0L);
when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(0L, 0L, 0L));
final ShardId shardId = new ShardId("index", "_na_", randomIntBetween(0, 3));
when(indexShard.shardId()).thenReturn(shardId);
clusterService = createClusterService(threadPool);
clusterBlock = MetaDataIndexStateService.createIndexClosingBlock();
setState(clusterService, new ClusterState.Builder(clusterService.state())
.blocks(ClusterBlocks.builder().blocks(clusterService.state().blocks()).addIndexBlock("index", clusterBlock).build()).build());
transport = new CapturingTransport();
TransportService transportService = transport.createTransportService(Settings.EMPTY, threadPool,
TransportService.NOOP_TRANSPORT_INTERCEPTOR, x -> clusterService.localNode(), null, Collections.emptySet());
transportService.start();
transportService.acceptIncomingRequests();
ShardStateAction shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool);
action = new TransportVerifyShardBeforeCloseAction(Settings.EMPTY, transportService, clusterService, mock(IndicesService.class),
mock(ThreadPool.class), shardStateAction, mock(ActionFilters.class), mock(IndexNameExpressionResolver.class));
}
@Override
@After
public void tearDown() throws Exception {
super.tearDown();
clusterService.close();
}
@AfterClass
public static void afterClass() {
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
threadPool = null;
}
private void executeOnPrimaryOrReplica() throws Exception {
final TaskId taskId = new TaskId("_node_id", randomNonNegativeLong());
final TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, taskId);
if (randomBoolean()) {
assertNotNull(action.shardOperationOnPrimary(request, indexShard));
} else {
assertNotNull(action.shardOperationOnPrimary(request, indexShard));
}
}
public void testOperationSuccessful() throws Exception {
executeOnPrimaryOrReplica();
verify(indexShard, times(1)).flush(any(FlushRequest.class));
}
public void testOperationFailsWithOnGoingOps() {
when(indexShard.getActiveOperationsCount()).thenReturn(randomIntBetween(1, 10));
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
assertThat(exception.getMessage(),
equalTo("On-going operations in progress while checking index shard " + indexShard.shardId() + " before closing"));
verify(indexShard, times(0)).flush(any(FlushRequest.class));
}
public void testOperationFailsWithNoBlock() {
setState(clusterService, new ClusterState.Builder(new ClusterName("test")).build());
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
assertThat(exception.getMessage(),
equalTo("Index shard " + indexShard.shardId() + " must be blocked by " + clusterBlock + " before closing"));
verify(indexShard, times(0)).flush(any(FlushRequest.class));
}
public void testOperationFailsWithGlobalCheckpointNotCaughtUp() {
final long maxSeqNo = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, Long.MAX_VALUE);
final long localCheckpoint = randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, maxSeqNo);
final long globalCheckpoint = randomValueOtherThan(maxSeqNo,
() -> randomLongBetween(SequenceNumbers.UNASSIGNED_SEQ_NO, localCheckpoint));
when(indexShard.seqNoStats()).thenReturn(new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint));
when(indexShard.getGlobalCheckpoint()).thenReturn(globalCheckpoint);
IllegalStateException exception = expectThrows(IllegalStateException.class, this::executeOnPrimaryOrReplica);
assertThat(exception.getMessage(), equalTo("Global checkpoint [" + globalCheckpoint + "] mismatches maximum sequence number ["
+ maxSeqNo + "] on index shard " + indexShard.shardId()));
verify(indexShard, times(0)).flush(any(FlushRequest.class));
}
public void testUnavailableShardsMarkedAsStale() throws Exception {
final String index = "test";
final ShardId shardId = new ShardId(index, "_na_", 0);
final int nbReplicas = randomIntBetween(1, 10);
final ShardRoutingState[] replicaStates = new ShardRoutingState[nbReplicas];
for (int i = 0; i < replicaStates.length; i++) {
replicaStates[i] = ShardRoutingState.STARTED;
}
final ClusterState clusterState = state(index, true, ShardRoutingState.STARTED, replicaStates);
setState(clusterService, clusterState);
IndexShardRoutingTable shardRoutingTable = clusterState.routingTable().index(index).shard(shardId.id());
final IndexMetaData indexMetaData = clusterState.getMetaData().index(index);
final ShardRouting primaryRouting = shardRoutingTable.primaryShard();
final long primaryTerm = indexMetaData.primaryTerm(0);
final Set<String> inSyncAllocationIds = indexMetaData.inSyncAllocationIds(0);
final Set<String> trackedShards = shardRoutingTable.getAllAllocationIds();
List<ShardRouting> unavailableShards = randomSubsetOf(randomIntBetween(1, nbReplicas), shardRoutingTable.replicaShards());
IndexShardRoutingTable.Builder shardRoutingTableBuilder = new IndexShardRoutingTable.Builder(shardRoutingTable);
unavailableShards.forEach(shardRoutingTableBuilder::removeShard);
shardRoutingTable = shardRoutingTableBuilder.build();
final ReplicationGroup replicationGroup = new ReplicationGroup(shardRoutingTable, inSyncAllocationIds, trackedShards);
assertThat(replicationGroup.getUnavailableInSyncShards().size(), greaterThan(0));
final PlainActionFuture<PrimaryResult> listener = new PlainActionFuture<>();
TaskId taskId = new TaskId(clusterService.localNode().getId(), 0L);
TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId, clusterBlock, taskId);
ReplicationOperation.Replicas<TransportVerifyShardBeforeCloseAction.ShardRequest> proxy = action.newReplicasProxy(primaryTerm);
ReplicationOperation<TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest, PrimaryResult> operation =
new ReplicationOperation<>(request, createPrimary(primaryRouting, replicationGroup), listener, proxy, logger, "test");
operation.execute();
final CapturingTransport.CapturedRequest[] capturedRequests = transport.getCapturedRequestsAndClear();
assertThat(capturedRequests.length, equalTo(nbReplicas));
for (CapturingTransport.CapturedRequest capturedRequest : capturedRequests) {
final String actionName = capturedRequest.action;
if (actionName.startsWith(ShardStateAction.SHARD_FAILED_ACTION_NAME)) {
assertThat(capturedRequest.request, instanceOf(ShardStateAction.FailedShardEntry.class));
String allocationId = ((ShardStateAction.FailedShardEntry) capturedRequest.request).getAllocationId();
assertTrue(unavailableShards.stream().anyMatch(shardRouting -> shardRouting.allocationId().getId().equals(allocationId)));
transport.handleResponse(capturedRequest.requestId, TransportResponse.Empty.INSTANCE);
} else if (actionName.startsWith(TransportVerifyShardBeforeCloseAction.NAME)) {
assertThat(capturedRequest.request, instanceOf(ConcreteShardRequest.class));
String allocationId = ((ConcreteShardRequest) capturedRequest.request).getTargetAllocationID();
assertFalse(unavailableShards.stream().anyMatch(shardRouting -> shardRouting.allocationId().getId().equals(allocationId)));
assertTrue(inSyncAllocationIds.stream().anyMatch(inSyncAllocationId -> inSyncAllocationId.equals(allocationId)));
transport.handleResponse(capturedRequest.requestId, new TransportReplicationAction.ReplicaResponse(0L, 0L));
} else {
fail("Test does not support action " + capturedRequest.action);
}
}
final ReplicationResponse.ShardInfo shardInfo = listener.get().getShardInfo();
assertThat(shardInfo.getFailed(), equalTo(0));
assertThat(shardInfo.getFailures(), arrayWithSize(0));
assertThat(shardInfo.getSuccessful(), equalTo(1 + nbReplicas - unavailableShards.size()));
}
private static ReplicationOperation.Primary<
TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest,
PrimaryResult>
createPrimary(final ShardRouting primary, final ReplicationGroup replicationGroup) {
return new ReplicationOperation.Primary<
TransportVerifyShardBeforeCloseAction.ShardRequest,
TransportVerifyShardBeforeCloseAction.ShardRequest,
PrimaryResult>() {
@Override
public ShardRouting routingEntry() {
return primary;
}
@Override
public ReplicationGroup getReplicationGroup() {
return replicationGroup;
}
@Override
public PrimaryResult perform(TransportVerifyShardBeforeCloseAction.ShardRequest request) throws Exception {
return new PrimaryResult(request);
}
@Override
public void failShard(String message, Exception exception) {
}
@Override
public void updateLocalCheckpointForShard(String allocationId, long checkpoint) {
}
@Override
public void updateGlobalCheckpointForShard(String allocationId, long globalCheckpoint) {
}
@Override
public long localCheckpoint() {
return 0;
}
@Override
public long globalCheckpoint() {
return 0;
}
@Override
public long maxSeqNoOfUpdatesOrDeletes() {
return 0;
}
};
}
private static class PrimaryResult implements ReplicationOperation.PrimaryResult<TransportVerifyShardBeforeCloseAction.ShardRequest> {
private final TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest;
private final SetOnce<ReplicationResponse.ShardInfo> shardInfo;
private PrimaryResult(final TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest) {
this.replicaRequest = replicaRequest;
this.shardInfo = new SetOnce<>();
}
@Override
public TransportVerifyShardBeforeCloseAction.ShardRequest replicaRequest() {
return replicaRequest;
}
@Override
public void setShardInfo(ReplicationResponse.ShardInfo shardInfo) {
this.shardInfo.set(shardInfo);
}
public ReplicationResponse.ShardInfo getShardInfo() {
return shardInfo.get();
}
}
}

View File

@ -20,36 +20,35 @@
package org.elasticsearch.cluster.block;
import org.elasticsearch.Version;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import static java.util.EnumSet.copyOf;
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
import static org.elasticsearch.test.VersionUtils.randomVersion;
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
import static org.hamcrest.CoreMatchers.endsWith;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isOneOf;
import static org.hamcrest.Matchers.nullValue;
public class ClusterBlockTests extends ESTestCase {
public void testSerialization() throws Exception {
int iterations = randomIntBetween(10, 100);
int iterations = randomIntBetween(5, 20);
for (int i = 0; i < iterations; i++) {
// Get a random version
Version version = randomVersion(random());
// Get a random list of ClusterBlockLevels
EnumSet<ClusterBlockLevel> levels = EnumSet.noneOf(ClusterBlockLevel.class);
int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length);
for (int j = 0; j < nbLevels; j++) {
levels.add(randomFrom(ClusterBlockLevel.values()));
}
ClusterBlock clusterBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(),
randomBoolean(), false, randomFrom(RestStatus.values()), levels);
ClusterBlock clusterBlock = randomClusterBlock(version);
BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(version);
@ -59,37 +58,133 @@ public class ClusterBlockTests extends ESTestCase {
in.setVersion(version);
ClusterBlock result = ClusterBlock.readClusterBlock(in);
assertThat(result.id(), equalTo(clusterBlock.id()));
assertThat(result.status(), equalTo(clusterBlock.status()));
assertThat(result.description(), equalTo(clusterBlock.description()));
assertThat(result.retryable(), equalTo(clusterBlock.retryable()));
assertThat(result.disableStatePersistence(), equalTo(clusterBlock.disableStatePersistence()));
assertArrayEquals(result.levels().toArray(), clusterBlock.levels().toArray());
assertClusterBlockEquals(clusterBlock, result);
}
}
public void testBwcSerialization() throws Exception {
for (int runs = 0; runs < randomIntBetween(5, 20); runs++) {
// Generate a random cluster block in version < 7.0.0
final Version version = randomVersionBetween(random(), Version.V_6_0_0, getPreviousVersion(Version.V_7_0_0));
final ClusterBlock expected = randomClusterBlock(version);
assertNull(expected.uuid());
// Serialize to node in current version
final BytesStreamOutput out = new BytesStreamOutput();
expected.writeTo(out);
// Deserialize and check the cluster block
final ClusterBlock actual = ClusterBlock.readClusterBlock(out.bytes().streamInput());
assertClusterBlockEquals(expected, actual);
}
for (int runs = 0; runs < randomIntBetween(5, 20); runs++) {
// Generate a random cluster block in current version
final ClusterBlock expected = randomClusterBlock(Version.CURRENT);
// Serialize to node in version < 7.0.0
final BytesStreamOutput out = new BytesStreamOutput();
out.setVersion(randomVersionBetween(random(), Version.V_6_0_0, getPreviousVersion(Version.V_7_0_0)));
expected.writeTo(out);
// Deserialize and check the cluster block
final StreamInput in = out.bytes().streamInput();
in.setVersion(out.getVersion());
final ClusterBlock actual = ClusterBlock.readClusterBlock(in);
assertThat(actual.id(), equalTo(expected.id()));
assertThat(actual.status(), equalTo(expected.status()));
assertThat(actual.description(), equalTo(expected.description()));
assertThat(actual.retryable(), equalTo(expected.retryable()));
assertThat(actual.disableStatePersistence(), equalTo(expected.disableStatePersistence()));
assertArrayEquals(actual.levels().toArray(), expected.levels().toArray());
}
}
public void testToStringDanglingComma() {
EnumSet<ClusterBlockLevel> levels = EnumSet.noneOf(ClusterBlockLevel.class);
int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length);
for (int j = 0; j < nbLevels; j++) {
levels.add(randomFrom(ClusterBlockLevel.values()));
}
ClusterBlock clusterBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(),
randomBoolean(), false, randomFrom(RestStatus.values()), levels);
final ClusterBlock clusterBlock = randomClusterBlock();
assertThat(clusterBlock.toString(), not(endsWith(",")));
}
public void testGlobalBlocksCheckedIfNoIndicesSpecified() {
EnumSet<ClusterBlockLevel> levels = EnumSet.noneOf(ClusterBlockLevel.class);
int nbLevels = randomIntBetween(1, ClusterBlockLevel.values().length);
for (int j = 0; j < nbLevels; j++) {
levels.add(randomFrom(ClusterBlockLevel.values()));
}
ClusterBlock globalBlock = new ClusterBlock(randomInt(), "cluster block #" + randomInt(), randomBoolean(),
randomBoolean(), false, randomFrom(RestStatus.values()), levels);
ClusterBlock globalBlock = randomClusterBlock();
ClusterBlocks clusterBlocks = new ClusterBlocks(Collections.singleton(globalBlock), ImmutableOpenMap.of());
ClusterBlockException exception = clusterBlocks.indicesBlockedException(randomFrom(globalBlock.levels()), new String[0]);
assertNotNull(exception);
assertEquals(exception.blocks(), Collections.singleton(globalBlock));
}
public void testRemoveIndexBlockWithId() {
final ClusterBlocks.Builder builder = ClusterBlocks.builder();
builder.addIndexBlock("index-1",
new ClusterBlock(1, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
builder.addIndexBlock("index-1",
new ClusterBlock(2, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
builder.addIndexBlock("index-1",
new ClusterBlock(3, "uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
builder.addIndexBlock("index-1",
new ClusterBlock(3, "other uuid", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
builder.addIndexBlock("index-2",
new ClusterBlock(3, "uuid3", "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL)));
ClusterBlocks clusterBlocks = builder.build();
assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(4));
assertThat(clusterBlocks.indices().get("index-2").size(), equalTo(1));
builder.removeIndexBlockWithId("index-1", 3);
clusterBlocks = builder.build();
assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(2));
assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 1), is(true));
assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 2), is(true));
assertThat(clusterBlocks.indices().get("index-2").size(), equalTo(1));
assertThat(clusterBlocks.hasIndexBlockWithId("index-2", 3), is(true));
builder.removeIndexBlockWithId("index-2", 3);
clusterBlocks = builder.build();
assertThat(clusterBlocks.indices().get("index-1").size(), equalTo(2));
assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 1), is(true));
assertThat(clusterBlocks.hasIndexBlockWithId("index-1", 2), is(true));
assertThat(clusterBlocks.indices().get("index-2"), nullValue());
assertThat(clusterBlocks.hasIndexBlockWithId("index-2", 3), is(false));
}
public void testGetIndexBlockWithId() {
final int blockId = randomInt();
final ClusterBlock[] clusterBlocks = new ClusterBlock[randomIntBetween(1, 5)];
final ClusterBlocks.Builder builder = ClusterBlocks.builder();
for (int i = 0; i < clusterBlocks.length; i++) {
clusterBlocks[i] = new ClusterBlock(blockId, "uuid" + i, "", true, true, true, RestStatus.OK, copyOf(ClusterBlockLevel.ALL));
builder.addIndexBlock("index", clusterBlocks[i]);
}
assertThat(builder.build().indices().get("index").size(), equalTo(clusterBlocks.length));
assertThat(builder.build().getIndexBlockWithId("index", blockId), isOneOf(clusterBlocks));
assertThat(builder.build().getIndexBlockWithId("index", randomValueOtherThan(blockId, ESTestCase::randomInt)), nullValue());
}
private ClusterBlock randomClusterBlock() {
return randomClusterBlock(randomVersion(random()));
}
private ClusterBlock randomClusterBlock(final Version version) {
final String uuid = (version.onOrAfter(Version.V_7_0_0) && randomBoolean()) ? UUIDs.randomBase64UUID() : null;
final List<ClusterBlockLevel> levels = Arrays.asList(ClusterBlockLevel.values());
return new ClusterBlock(randomInt(), uuid, "cluster block #" + randomInt(), randomBoolean(), randomBoolean(), randomBoolean(),
randomFrom(RestStatus.values()), copyOf(randomSubsetOf(randomIntBetween(1, levels.size()), levels)));
}
private void assertClusterBlockEquals(final ClusterBlock expected, final ClusterBlock actual) {
assertEquals(expected, actual);
assertThat(actual.id(), equalTo(expected.id()));
assertThat(actual.uuid(), equalTo(expected.uuid()));
assertThat(actual.status(), equalTo(expected.status()));
assertThat(actual.description(), equalTo(expected.description()));
assertThat(actual.retryable(), equalTo(expected.retryable()));
assertThat(actual.disableStatePersistence(), equalTo(expected.disableStatePersistence()));
assertArrayEquals(actual.levels().toArray(), expected.levels().toArray());
}
}

View File

@ -19,28 +19,225 @@
package org.elasticsearch.cluster.metadata;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESTestCase;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static java.util.Collections.emptyMap;
import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableMap;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class MetaDataIndexStateServiceTests extends ESTestCase {
public void testCloseRoutingTable() {
final Set<Index> nonBlockedIndices = new HashSet<>();
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
final Map<Index, AcknowledgedResponse> results = new HashMap<>();
ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTable")).build();
for (int i = 0; i < randomIntBetween(1, 25); i++) {
final String indexName = "index-" + i;
if (randomBoolean()) {
state = addOpenedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
nonBlockedIndices.add(state.metaData().index(indexName).getIndex());
} else {
final ClusterBlock closingBlock = MetaDataIndexStateService.createIndexClosingBlock();
state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state, closingBlock);
blockedIndices.put(state.metaData().index(indexName).getIndex(), closingBlock);
results.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean()));
}
}
final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results);
assertThat(updatedState.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size()));
for (Index nonBlockedIndex : nonBlockedIndices) {
assertIsOpened(nonBlockedIndex.getName(), updatedState);
assertThat(updatedState.blocks().hasIndexBlockWithId(nonBlockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(false));
}
for (Index blockedIndex : blockedIndices.keySet()) {
if (results.get(blockedIndex).isAcknowledged()) {
assertIsClosed(blockedIndex.getName(), updatedState);
} else {
assertIsOpened(blockedIndex.getName(), updatedState);
assertThat(updatedState.blocks().hasIndexBlockWithId(blockedIndex.getName(), INDEX_CLOSED_BLOCK_ID), is(true));
}
}
}
public void testAddIndexClosedBlocks() {
final ClusterState initialState = ClusterState.builder(new ClusterName("testAddIndexClosedBlocks")).build();
{
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
Index[] indices = new Index[]{new Index("_name", "_uid")};
expectThrows(IndexNotFoundException.class, () ->
MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, initialState));
assertTrue(blockedIndices.isEmpty());
}
{
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
Index[] indices = Index.EMPTY_ARRAY;
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, initialState);
assertSame(initialState, updatedState);
assertTrue(blockedIndices.isEmpty());
}
{
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
Index[] indices = new Index[]{state.metaData().index("closed").getIndex()};
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state);
assertSame(state, updatedState);
assertTrue(blockedIndices.isEmpty());
}
{
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
state = addOpenedIndex("opened", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
Index[] indices = new Index[]{state.metaData().index("opened").getIndex(), state.metaData().index("closed").getIndex()};
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state);
assertNotSame(state, updatedState);
Index opened = updatedState.metaData().index("opened").getIndex();
assertTrue(blockedIndices.containsKey(opened));
assertHasBlock("opened", updatedState, blockedIndices.get(opened));
Index closed = updatedState.metaData().index("closed").getIndex();
assertFalse(blockedIndices.containsKey(closed));
}
{
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
ClusterState state = addRestoredIndex("restored", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
if (randomBoolean()) {
state = addOpenedIndex("opened", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
}
if (randomBoolean()) {
state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
}
Index[] indices = new Index[]{state.metaData().index("restored").getIndex()};
MetaDataIndexStateService.addIndexClosedBlocks(indices, unmodifiableMap(emptyMap()), state);
});
assertThat(exception.getMessage(), containsString("Cannot close indices that are being restored: [[restored]]"));
}
{
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
ClusterState state = addSnapshotIndex("snapshotted", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
if (randomBoolean()) {
state = addOpenedIndex("opened", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
}
if (randomBoolean()) {
state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
}
Index[] indices = new Index[]{state.metaData().index("snapshotted").getIndex()};
MetaDataIndexStateService.addIndexClosedBlocks(indices, unmodifiableMap(emptyMap()), state);
});
assertThat(exception.getMessage(), containsString("Cannot close indices that are being snapshotted: [[snapshotted]]"));
}
{
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
ClusterState state = addOpenedIndex("index-1", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
state = addOpenedIndex("index-2", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
state = addOpenedIndex("index-3", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
final boolean mixedVersions = randomBoolean();
if (mixedVersions) {
state = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder(state.nodes())
.add(new DiscoveryNode("old_node", buildNewFakeTransportAddress(), emptyMap(),
new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())), Version.V_6_0_0)))
.build();
}
Index index1 = state.metaData().index("index-1").getIndex();
Index index2 = state.metaData().index("index-2").getIndex();
Index index3 = state.metaData().index("index-3").getIndex();
Index[] indices = new Index[]{index1, index2, index3};
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state);
assertNotSame(state, updatedState);
for (Index index : indices) {
assertTrue(blockedIndices.containsKey(index));
if (mixedVersions) {
assertIsClosed(index.getName(), updatedState);
} else {
assertHasBlock(index.getName(), updatedState, blockedIndices.get(index));
}
}
}
}
public void testAddIndexClosedBlocksReusesBlocks() {
ClusterState state = ClusterState.builder(new ClusterName("testAddIndexClosedBlocksReuseBlocks")).build();
state = addOpenedIndex("test", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
Index test = state.metaData().index("test").getIndex();
Index[] indices = new Index[]{test};
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
state = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state);
assertTrue(blockedIndices.containsKey(test));
assertHasBlock(test.getName(), state, blockedIndices.get(test));
final Map<Index, ClusterBlock> blockedIndices2 = new HashMap<>();
state = MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices2, state);
assertTrue(blockedIndices2.containsKey(test));
assertHasBlock(test.getName(), state, blockedIndices2.get(test));
assertEquals(blockedIndices.get(test), blockedIndices2.get(test));
}
public void testValidateShardLimit() {
int nodesInCluster = randomIntBetween(2,100);
ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
@ -55,7 +252,6 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
.collect(Collectors.toList())
.toArray(new Index[2]);
DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
int maxShards = counts.getShardsPerNode() * nodesInCluster;
@ -69,32 +265,123 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
int closedIndexShards, int closedIndexReplicas, Settings clusterSettings) {
ImmutableOpenMap.Builder<String, DiscoveryNode> dataNodes = ImmutableOpenMap.builder();
for (int i = 0; i < nodesInCluster; i++) {
dataNodes.put(randomAlphaOfLengthBetween(5,15), mock(DiscoveryNode.class));
dataNodes.put(randomAlphaOfLengthBetween(5, 15), mock(DiscoveryNode.class));
}
DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(nodes.getDataNodes()).thenReturn(dataNodes.build());
IndexMetaData.Builder openIndexMetaData = IndexMetaData.builder(randomAlphaOfLengthBetween(5, 15))
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.creationDate(randomLong())
.numberOfShards(openIndexShards)
.numberOfReplicas(openIndexReplicas);
IndexMetaData.Builder closedIndexMetaData = IndexMetaData.builder(randomAlphaOfLengthBetween(5, 15))
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.creationDate(randomLong())
.state(IndexMetaData.State.CLOSE)
.numberOfShards(closedIndexShards)
.numberOfReplicas(closedIndexReplicas);
MetaData.Builder metaData = MetaData.builder().put(openIndexMetaData).put(closedIndexMetaData);
ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build();
state = addOpenedIndex(randomAlphaOfLengthBetween(5, 15), openIndexShards, openIndexReplicas, state);
state = addClosedIndex(randomAlphaOfLengthBetween(5, 15), closedIndexShards, closedIndexReplicas, state);
final MetaData.Builder metaData = MetaData.builder(state.metaData());
if (randomBoolean()) {
metaData.persistentSettings(clusterSettings);
} else {
metaData.transientSettings(clusterSettings);
}
return ClusterState.builder(state).metaData(metaData).nodes(nodes).build();
}
return ClusterState.builder(ClusterName.DEFAULT)
.metaData(metaData)
.nodes(nodes)
private static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.OPEN, null);
}
private static ClusterState addClosedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.CLOSE, INDEX_CLOSED_BLOCK);
}
private static ClusterState addBlockedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state,
final ClusterBlock closingBlock) {
return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.OPEN, closingBlock);
}
private static ClusterState addRestoredIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
ClusterState newState = addOpenedIndex(index, numShards, numReplicas, state);
final ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder();
for (ShardRouting shardRouting : newState.routingTable().index(index).randomAllActiveShardsIt()) {
shardsBuilder.put(shardRouting.shardId(), new RestoreInProgress.ShardRestoreStatus(shardRouting.currentNodeId()));
}
final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)));
final RestoreInProgress.Entry entry =
new RestoreInProgress.Entry("_uuid", snapshot, RestoreInProgress.State.INIT, ImmutableList.of(index), shardsBuilder.build());
return ClusterState.builder(newState)
.putCustom(RestoreInProgress.TYPE, new RestoreInProgress.Builder().add(entry).build())
.build();
}
private static ClusterState addSnapshotIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
ClusterState newState = addOpenedIndex(index, numShards, numReplicas, state);
final ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
for (ShardRouting shardRouting : newState.routingTable().index(index).randomAllActiveShardsIt()) {
shardsBuilder.put(shardRouting.shardId(), new SnapshotsInProgress.ShardSnapshotStatus(shardRouting.currentNodeId()));
}
final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)));
final SnapshotsInProgress.Entry entry =
new SnapshotsInProgress.Entry(snapshot, randomBoolean(), false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build());
return ClusterState.builder(newState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entry)).build();
}
private static ClusterState addIndex(final ClusterState currentState,
final String index,
final int numShards,
final int numReplicas,
final IndexMetaData.State state,
@Nullable final ClusterBlock block) {
final IndexMetaData indexMetaData = IndexMetaData.builder(index)
.state(state)
.creationDate(randomNonNegativeLong())
.settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, numShards)
.put(SETTING_NUMBER_OF_REPLICAS, numReplicas))
.build();
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState);
clusterStateBuilder.metaData(MetaData.builder(currentState.metaData()).put(indexMetaData, true));
if (state == IndexMetaData.State.OPEN) {
final IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex());
for (int j = 0; j < indexMetaData.getNumberOfShards(); j++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), j);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), true, ShardRoutingState.STARTED));
for (int k = 0; k < indexMetaData.getNumberOfReplicas(); k++) {
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), false, ShardRoutingState.STARTED));
}
indexRoutingTable.addIndexShard(indexShardRoutingBuilder.build());
}
clusterStateBuilder.routingTable(RoutingTable.builder(currentState.routingTable()).add(indexRoutingTable).build());
}
if (block != null) {
clusterStateBuilder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).addIndexBlock(index, block));
}
return clusterStateBuilder.build();
}
private static void assertIsOpened(final String indexName, final ClusterState clusterState) {
assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.OPEN));
assertThat(clusterState.routingTable().index(indexName), notNullValue());
}
private static void assertIsClosed(final String indexName, final ClusterState clusterState) {
assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.CLOSE));
assertThat(clusterState.routingTable().index(indexName), nullValue());
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true));
assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]",
clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream()
.filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));
}
private static void assertHasBlock(final String indexName, final ClusterState clusterState, final ClusterBlock closingBlock) {
assertThat(clusterState.blocks().hasIndexBlock(indexName, closingBlock), is(true));
assertThat("Index " + indexName + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]",
clusterState.blocks().indices().getOrDefault(indexName, emptySet()).stream()
.filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));
}
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.index.Index;
import java.util.Map;
public class MetaDataIndexStateServiceUtils {
private MetaDataIndexStateServiceUtils(){
}
/**
* Allows to call {@link MetaDataIndexStateService#addIndexClosedBlocks(Index[], Map, ClusterState)} which is a protected method.
*/
public static ClusterState addIndexClosedBlocks(final Index[] indices, final Map<Index, ClusterBlock> blockedIndices,
final ClusterState state) {
return MetaDataIndexStateService.addIndexClosedBlocks(indices, blockedIndices, state);
}
/**
* Allows to call {@link MetaDataIndexStateService#closeRoutingTable(ClusterState, Map, Map)} which is a protected method.
*/
public static ClusterState closeRoutingTable(final ClusterState state,
final Map<Index, ClusterBlock> blockedIndices,
final Map<Index, AcknowledgedResponse> results) {
return MetaDataIndexStateService.closeRoutingTable(state, blockedIndices, results);
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@ -39,6 +40,7 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils;
@ -50,6 +52,7 @@ import org.elasticsearch.cluster.EmptyClusterInfoService;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction.FailedShardEntry;
import org.elasticsearch.cluster.action.shard.ShardStateAction.StartedShardEntry;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.coordination.JoinTaskExecutor;
import org.elasticsearch.cluster.coordination.NodeRemovalClusterStateTaskExecutor;
import org.elasticsearch.cluster.metadata.AliasValidator;
@ -58,6 +61,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateServiceUtils;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.node.DiscoveryNode;
@ -77,6 +81,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexEventListener;
@ -90,8 +95,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom;
@ -179,8 +187,11 @@ public class ClusterStateChanges {
return indexMetaData;
}
};
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction = new TransportVerifyShardBeforeCloseAction(SETTINGS,
transportService, clusterService, indicesService, threadPool, null, actionFilters, indexNameExpressionResolver);
MetaDataIndexStateService indexStateService = new MetaDataIndexStateService(clusterService, allocationService,
metaDataIndexUpgradeService, indicesService, threadPool);
metaDataIndexUpgradeService, indicesService, threadPool, transportVerifyShardBeforeCloseAction);
MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(SETTINGS, clusterService, allocationService);
MetaDataUpdateSettingsService metaDataUpdateSettingsService = new MetaDataUpdateSettingsService(clusterService,
allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, threadPool);
@ -210,7 +221,15 @@ public class ClusterStateChanges {
}
public ClusterState closeIndices(ClusterState state, CloseIndexRequest request) {
return execute(transportCloseIndexAction, request, state);
final Index[] concreteIndices = Arrays.stream(request.indices())
.map(index -> state.metaData().index(index).getIndex()).toArray(Index[]::new);
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, blockedIndices, state);
newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices, blockedIndices.keySet().stream()
.collect(Collectors.toMap(Function.identity(), index -> new AcknowledgedResponse(true))));
return allocationService.reroute(newState, "indices closed");
}
public ClusterState openIndices(ClusterState state, OpenIndexRequest request) {

View File

@ -0,0 +1,344 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.state;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;
import static java.util.Collections.emptySet;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class CloseIndexIT extends ESIntegTestCase {
public void testCloseMissingIndex() {
IndexNotFoundException e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareClose("test").get());
assertThat(e.getMessage(), is("no such index [test]"));
}
public void testCloseOneMissingIndex() {
createIndex("test1");
final IndexNotFoundException e = expectThrows(IndexNotFoundException.class,
() -> client().admin().indices().prepareClose("test1", "test2").get());
assertThat(e.getMessage(), is("no such index [test2]"));
}
public void testCloseOneMissingIndexIgnoreMissing() {
createIndex("test1");
assertAcked(client().admin().indices().prepareClose("test1", "test2").setIndicesOptions(IndicesOptions.lenientExpandOpen()));
assertIndexIsClosed("test1");
}
public void testCloseNoIndex() {
final ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class,
() -> client().admin().indices().prepareClose().get());
assertThat(e.getMessage(), containsString("index is missing"));
}
public void testCloseNullIndex() {
final ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class,
() -> client().admin().indices().prepareClose((String[])null).get());
assertThat(e.getMessage(), containsString("index is missing"));
}
public void testCloseIndex() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
final int nbDocs = randomIntBetween(0, 50);
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs)
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
assertAcked(client().admin().indices().prepareOpen(indexName));
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs);
}
public void testCloseAlreadyClosedIndex() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
if (randomBoolean()) {
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(1, 10))
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
}
// First close should be acked
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
// Second close should be acked too
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
}
public void testCloseUnassignedIndex() {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
assertAcked(prepareCreate(indexName)
.setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(Settings.builder().put("index.routing.allocation.include._name", "nothing").build()));
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN));
assertThat(clusterState.routingTable().allShards().stream().allMatch(ShardRouting::unassigned), is(true));
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
}
public void testConcurrentClose() throws InterruptedException {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
final int nbDocs = randomIntBetween(10, 50);
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs)
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
ensureYellowAndNoInitializingShards(indexName);
final CountDownLatch startClosing = new CountDownLatch(1);
final Thread[] threads = new Thread[randomIntBetween(2, 5)];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
try {
startClosing.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
try {
client().admin().indices().prepareClose(indexName).get();
} catch (final Exception e) {
assertException(e, indexName);
}
});
threads[i].start();
}
startClosing.countDown();
for (Thread thread : threads) {
thread.join();
}
assertIndexIsClosed(indexName);
}
public void testCloseWhileIndexingDocuments() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
int nbDocs = 0;
try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client())) {
indexer.setAssertNoFailuresOnStop(false);
waitForDocs(randomIntBetween(10, 50), indexer);
assertAcked(client().admin().indices().prepareClose(indexName));
indexer.stop();
nbDocs += indexer.totalIndexedDocs();
final Throwable[] failures = indexer.getFailures();
if (failures != null) {
for (Throwable failure : failures) {
assertException(failure, indexName);
}
}
}
assertIndexIsClosed(indexName);
assertAcked(client().admin().indices().prepareOpen(indexName));
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs);
}
public void testCloseWhileDeletingIndices() throws Exception {
final String[] indices = new String[randomIntBetween(3, 10)];
for (int i = 0; i < indices.length; i++) {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
if (randomBoolean()) {
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, 10)
.mapToObj(n -> client().prepareIndex(indexName, "_doc", String.valueOf(n)).setSource("num", n)).collect(toList()));
}
indices[i] = indexName;
}
assertThat(client().admin().cluster().prepareState().get().getState().metaData().indices().size(), equalTo(indices.length));
final List<Thread> threads = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);
for (final String indexToDelete : indices) {
threads.add(new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
try {
assertAcked(client().admin().indices().prepareDelete(indexToDelete));
} catch (final Exception e) {
assertException(e, indexToDelete);
}
}));
}
for (final String indexToClose : indices) {
threads.add(new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
try {
client().admin().indices().prepareClose(indexToClose).get();
} catch (final Exception e) {
assertException(e, indexToClose);
}
}));
}
for (Thread thread : threads) {
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
}
public void testConcurrentClosesAndOpens() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
final BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client());
waitForDocs(1, indexer);
final CountDownLatch latch = new CountDownLatch(1);
final Runnable waitForLatch = () -> {
try {
latch.await();
} catch (final InterruptedException e) {
throw new AssertionError(e);
}
};
final List<Thread> threads = new ArrayList<>();
for (int i = 0; i < randomIntBetween(1, 3); i++) {
threads.add(new Thread(() -> {
try {
waitForLatch.run();
client().admin().indices().prepareClose(indexName).get();
} catch (final Exception e) {
throw new AssertionError(e);
}
}));
}
for (int i = 0; i < randomIntBetween(1, 3); i++) {
threads.add(new Thread(() -> {
try {
waitForLatch.run();
assertAcked(client().admin().indices().prepareOpen(indexName).get());
} catch (final Exception e) {
throw new AssertionError(e);
}
}));
}
for (Thread thread : threads) {
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
indexer.setAssertNoFailuresOnStop(false);
indexer.stop();
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
if (clusterState.metaData().indices().get(indexName).getState() == IndexMetaData.State.CLOSE) {
assertIndexIsClosed(indexName);
assertAcked(client().admin().indices().prepareOpen(indexName));
}
refresh(indexName);
assertIndexIsOpened(indexName);
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), indexer.totalIndexedDocs());
}
static void assertIndexIsClosed(final String... indices) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.CLOSE));
assertThat(clusterState.routingTable().index(index), nullValue());
assertThat(clusterState.blocks().hasIndexBlock(index, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true));
assertThat("Index " + index + " must have only 1 block with [id=" + MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID + "]",
clusterState.blocks().indices().getOrDefault(index, emptySet()).stream()
.filter(clusterBlock -> clusterBlock.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));
}
}
static void assertIndexIsOpened(final String... indices) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.OPEN));
assertThat(clusterState.routingTable().index(index), notNullValue());
assertThat(clusterState.blocks().hasIndexBlock(index, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(false));
}
}
static void assertException(final Throwable throwable, final String indexName) {
final Throwable t = ExceptionsHelper.unwrapCause(throwable);
if (t instanceof ClusterBlockException) {
ClusterBlockException clusterBlockException = (ClusterBlockException) t;
assertThat(clusterBlockException.blocks(), hasSize(1));
assertTrue(clusterBlockException.blocks().stream().allMatch(b -> b.id() == MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID));
} else if (t instanceof IndexClosedException) {
IndexClosedException indexClosedException = (IndexClosedException) t;
assertThat(indexClosedException.getIndex(), notNullValue());
assertThat(indexClosedException.getIndex().getName(), equalTo(indexName));
} else if (t instanceof IndexNotFoundException) {
IndexNotFoundException indexNotFoundException = (IndexNotFoundException) t;
assertThat(indexNotFoundException.getIndex(), notNullValue());
assertThat(indexNotFoundException.getIndex().getName(), equalTo(indexName));
} else {
fail("Unexpected exception: " + t);
}
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.state;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static org.elasticsearch.indices.state.CloseIndexIT.assertException;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 10)
.put(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1)
.build();
}
@Override
protected int numberOfReplicas() {
return 1;
}
public void testCloseWhileRelocatingShards() throws Exception {
final String[] indices = new String[randomIntBetween(1, 3)];
final Map<String, Long> docsPerIndex = new HashMap<>();
for (int i = 0; i < indices.length; i++) {
final String indexName = "index-" + i;
createIndex(indexName);
int nbDocs = 0;
if (randomBoolean()) {
nbDocs = randomIntBetween(1, 20);
for (int j = 0; j < nbDocs; j++) {
IndexResponse indexResponse = client().prepareIndex(indexName, "_doc").setSource("num", j).get();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
}
docsPerIndex.put(indexName, (long) nbDocs);
indices[i] = indexName;
}
ensureGreen(indices);
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.toString())));
// start some concurrent indexing threads
final Map<String, BackgroundIndexer> indexers = new HashMap<>();
for (final String index : indices) {
if (randomBoolean()) {
final BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client(), -1, scaledRandomIntBetween(1, 3));
waitForDocs(1, indexer);
indexers.put(index, indexer);
}
}
final Set<String> acknowledgedCloses = ConcurrentCollections.newConcurrentSet();
final String newNode = internalCluster().startDataOnlyNode();
try {
final CountDownLatch latch = new CountDownLatch(1);
final List<Thread> threads = new ArrayList<>();
// start shards relocating threads
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (final String indexToRelocate : indices) {
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexToRelocate);
for (int i = 0; i < getNumShards(indexToRelocate).numPrimaries; i++) {
final int shardId = i;
ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard();
assertTrue(primary.started());
ShardRouting replica = indexRoutingTable.shard(shardId).replicaShards().iterator().next();
assertTrue(replica.started());
final String currentNodeId = randomBoolean() ? primary.currentNodeId() : replica.currentNodeId();
assertNotNull(currentNodeId);
final Thread thread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
assertAcked(client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(indexToRelocate, shardId, currentNodeId, newNode)));
});
threads.add(thread);
thread.start();
}
}
// start index closing threads
for (final String indexToClose : indices) {
final Thread thread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
AcknowledgedResponse closeResponse = client().admin().indices().prepareClose(indexToClose).get();
if (closeResponse.isAcknowledged()) {
assertTrue("Index closing should not be acknowledged twice", acknowledgedCloses.add(indexToClose));
}
});
threads.add(thread);
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
for (Map.Entry<String, BackgroundIndexer> entry : indexers.entrySet()) {
final BackgroundIndexer indexer = entry.getValue();
indexer.setAssertNoFailuresOnStop(false);
indexer.stop();
final String indexName = entry.getKey();
docsPerIndex.computeIfPresent(indexName, (key, value) -> value + indexer.totalIndexedDocs());
final Throwable[] failures = indexer.getFailures();
if (failures != null) {
for (Throwable failure : failures) {
assertException(failure, indexName);
}
}
}
} finally {
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey())));
}
for (String index : indices) {
if (acknowledgedCloses.contains(index)) {
assertIndexIsClosed(index);
} else {
assertIndexIsOpened(index);
}
}
assertThat("Consider that the test failed if no indices were successfully closed", acknowledgedCloses.size(), greaterThan(0));
assertAcked(client().admin().indices().prepareOpen("index-*"));
ensureGreen(indices);
for (String index : acknowledgedCloses) {
long docsCount = client().prepareSearch(index).setSize(0).get().getHits().getTotalHits().value;
assertEquals("Expected " + docsPerIndex.get(index) + " docs in index " + index + " but got " + docsCount
+ " (close acknowledged=" + acknowledgedCloses.contains(index) + ")", (long) docsPerIndex.get(index), docsCount);
}
}
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.indices.state;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
@ -46,6 +45,8 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_RE
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_BLOCKS_WRITE;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_READ_ONLY_ALLOW_DELETE;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
@ -53,7 +54,6 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFa
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
public class OpenCloseIndexIT extends ESIntegTestCase {
public void testSimpleCloseOpen() {
@ -72,13 +72,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
assertIndexIsOpened("test1");
}
public void testSimpleCloseMissingIndex() {
Client client = client();
Exception e = expectThrows(IndexNotFoundException.class, () ->
client.admin().indices().prepareClose("test1").execute().actionGet());
assertThat(e.getMessage(), is("no such index [test1]"));
}
public void testSimpleOpenMissingIndex() {
Client client = client();
Exception e = expectThrows(IndexNotFoundException.class, () ->
@ -86,27 +79,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
assertThat(e.getMessage(), is("no such index [test1]"));
}
public void testCloseOneMissingIndex() {
Client client = client();
createIndex("test1");
ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
Exception e = expectThrows(IndexNotFoundException.class, () ->
client.admin().indices().prepareClose("test1", "test2").execute().actionGet());
assertThat(e.getMessage(), is("no such index [test2]"));
}
public void testCloseOneMissingIndexIgnoreMissing() {
Client client = client();
createIndex("test1");
ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
AcknowledgedResponse closeIndexResponse = client.admin().indices().prepareClose("test1", "test2")
.setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet();
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
assertIndexIsClosed("test1");
}
public void testOpenOneMissingIndex() {
Client client = client();
createIndex("test1");
@ -200,20 +172,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
assertIndexIsOpened("test1", "test2", "test3");
}
public void testCloseNoIndex() {
Client client = client();
Exception e = expectThrows(ActionRequestValidationException.class, () ->
client.admin().indices().prepareClose().execute().actionGet());
assertThat(e.getMessage(), containsString("index is missing"));
}
public void testCloseNullIndex() {
Client client = client();
Exception e = expectThrows(ActionRequestValidationException.class, () ->
client.admin().indices().prepareClose((String[])null).execute().actionGet());
assertThat(e.getMessage(), containsString("index is missing"));
}
public void testOpenNoIndex() {
Client client = client();
Exception e = expectThrows(ActionRequestValidationException.class, () ->
@ -241,23 +199,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
assertIndexIsOpened("test1");
}
public void testCloseAlreadyClosedIndex() {
Client client = client();
createIndex("test1");
ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
//closing the index
AcknowledgedResponse closeIndexResponse = client.admin().indices().prepareClose("test1").execute().actionGet();
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
assertIndexIsClosed("test1");
//no problem if we try to close an index that's already in close state
closeIndexResponse = client.admin().indices().prepareClose("test1").execute().actionGet();
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
assertIndexIsClosed("test1");
}
public void testSimpleCloseOpenAlias() {
Client client = client();
createIndex("test1");
@ -317,23 +258,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
ensureGreen("test");
}
private void assertIndexIsOpened(String... indices) {
checkIndexState(IndexMetaData.State.OPEN, indices);
}
private void assertIndexIsClosed(String... indices) {
checkIndexState(IndexMetaData.State.CLOSE, indices);
}
private void checkIndexState(IndexMetaData.State expectedState, String... indices) {
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().execute().actionGet();
for (String index : indices) {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().indices().get(index);
assertThat(indexMetaData, notNullValue());
assertThat(indexMetaData.getState(), equalTo(expectedState));
}
}
public void testOpenCloseWithDocs() throws IOException, ExecutionException, InterruptedException {
String mapping = Strings.toString(XContentFactory.jsonBuilder().
startObject().

View File

@ -0,0 +1,167 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.state;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Glob;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.util.concurrent.RunOnce;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.elasticsearch.cluster.metadata.MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, minNumDataNodes = 2)
public class ReopenWhileClosingIT extends ESIntegTestCase {
@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return singletonList(MockTransportService.TestPlugin.class);
}
public void testReopenDuringClose() throws Exception {
final String indexName = "test";
createIndexWithDocs(indexName);
ensureYellowAndNoInitializingShards(indexName);
final CountDownLatch block = new CountDownLatch(1);
final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(indexName, block::countDown);
ActionFuture<AcknowledgedResponse> closeIndexResponse = client().admin().indices().prepareClose(indexName).execute();
assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS));
assertIndexIsBlocked(indexName);
assertFalse(closeIndexResponse.isDone());
assertAcked(client().admin().indices().prepareOpen(indexName));
releaseBlock.close();
assertFalse(closeIndexResponse.get().isAcknowledged());
assertIndexIsOpened(indexName);
}
public void testReopenDuringCloseOnMultipleIndices() throws Exception {
final List<String> indices = new ArrayList<>();
for (int i = 0; i < randomIntBetween(2, 10); i++) {
indices.add("index-" + i);
createIndexWithDocs(indices.get(i));
}
ensureYellowAndNoInitializingShards(indices.toArray(Strings.EMPTY_ARRAY));
final CountDownLatch block = new CountDownLatch(1);
final Releasable releaseBlock = interceptVerifyShardBeforeCloseActions(randomFrom(indices), block::countDown);
ActionFuture<AcknowledgedResponse> closeIndexResponse = client().admin().indices().prepareClose("index-*").execute();
assertTrue("Waiting for index to have a closing blocked", block.await(60, TimeUnit.SECONDS));
assertFalse(closeIndexResponse.isDone());
indices.forEach(ReopenWhileClosingIT::assertIndexIsBlocked);
final List<String> reopenedIndices = randomSubsetOf(randomIntBetween(1, indices.size()), indices);
assertAcked(client().admin().indices().prepareOpen(reopenedIndices.toArray(Strings.EMPTY_ARRAY)));
releaseBlock.close();
assertFalse(closeIndexResponse.get().isAcknowledged());
indices.forEach(index -> {
if (reopenedIndices.contains(index)) {
assertIndexIsOpened(index);
} else {
assertIndexIsClosed(index);
}
});
}
private void createIndexWithDocs(final String indexName) {
createIndex(indexName);
final int nbDocs = scaledRandomIntBetween(1, 100);
for (int i = 0; i < nbDocs; i++) {
index(indexName, "_doc", String.valueOf(i), "num", i);
}
assertIndexIsOpened(indexName);
}
/**
* Intercepts and blocks the {@link TransportVerifyShardBeforeCloseAction} executed for the given index pattern.
*/
private Releasable interceptVerifyShardBeforeCloseActions(final String indexPattern, final Runnable onIntercept) {
final MockTransportService mockTransportService = (MockTransportService) internalCluster()
.getInstance(TransportService.class, internalCluster().getMasterName());
final CountDownLatch release = new CountDownLatch(1);
for (DiscoveryNode node : internalCluster().clusterService().state().getNodes()) {
mockTransportService.addSendBehavior(internalCluster().getInstance(TransportService.class, node.getName()),
(connection, requestId, action, request, options) -> {
if (action.startsWith(TransportVerifyShardBeforeCloseAction.NAME)) {
if (request instanceof TransportVerifyShardBeforeCloseAction.ShardRequest) {
final String index = ((TransportVerifyShardBeforeCloseAction.ShardRequest) request).shardId().getIndexName();
if (Glob.globMatch(indexPattern, index)) {
logger.info("request {} intercepted for index {}", requestId, index);
onIntercept.run();
try {
release.await();
logger.info("request {} released for index {}", requestId, index);
} catch (final InterruptedException e) {
throw new AssertionError(e);
}
}
}
}
connection.sendRequest(requestId, action, request, options);
});
}
final RunOnce releaseOnce = new RunOnce(release::countDown);
return releaseOnce::run;
}
private static void assertIndexIsBlocked(final String... indices) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (String index : indices) {
assertThat(clusterState.metaData().indices().get(index).getState(), is(IndexMetaData.State.OPEN));
assertThat(clusterState.routingTable().index(index), notNullValue());
assertThat("Index " + index + " must have only 1 block with [id=" + INDEX_CLOSED_BLOCK_ID + "]",
clusterState.blocks().indices().getOrDefault(index, emptySet()).stream()
.filter(clusterBlock -> clusterBlock.id() == INDEX_CLOSED_BLOCK_ID).count(), equalTo(1L));
}
}
}

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -35,6 +34,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.test.ESIntegTestCase;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;
@ -61,8 +61,7 @@ public class SimpleIndexStateIT extends ESIntegTestCase {
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").get();
logger.info("--> closing test index...");
AcknowledgedResponse closeIndexResponse = client().admin().indices().prepareClose("test").get();
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
assertAcked(client().admin().indices().prepareClose("test"));
stateResponse = client().admin().cluster().prepareState().get();
assertThat(stateResponse.getState().metaData().index("test").getState(), equalTo(IndexMetaData.State.CLOSE));
@ -103,7 +102,7 @@ public class SimpleIndexStateIT extends ESIntegTestCase {
assertThat(health.isTimedOut(), equalTo(false));
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.RED));
client().admin().indices().prepareClose("test").get();
assertAcked(client().admin().indices().prepareClose("test"));
logger.info("--> updating test index settings to allow allocation");
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()

View File

@ -544,7 +544,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
logger.info("--> start snapshot with default settings and closed index - should be blocked");
assertBlocked(client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
.setIndices("test-idx-all", "test-idx-none", "test-idx-some", "test-idx-closed")
.setWaitForCompletion(true), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
.setWaitForCompletion(true), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID);
logger.info("--> start snapshot with default settings without a closed index - should fail");
@ -603,7 +603,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
equalTo(SnapshotState.PARTIAL));
}
assertAcked(client().admin().indices().prepareClose("test-idx-some", "test-idx-all").execute().actionGet());
assertAcked(client().admin().indices().prepareClose("test-idx-some", "test-idx-all"));
logger.info("--> restore incomplete snapshot - should fail");
assertThrows(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false)

View File

@ -1562,7 +1562,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> snapshot with closed index");
assertBlocked(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap").setWaitForCompletion(true)
.setIndices("test-idx", "test-idx-closed"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
.setIndices("test-idx", "test-idx-closed"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID);
}
public void testSnapshotSingleClosedIndex() throws Exception {
@ -1580,7 +1580,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
logger.info("--> snapshot");
assertBlocked(client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap-1")
.setWaitForCompletion(true).setIndices("test-idx"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
.setWaitForCompletion(true).setIndices("test-idx"), MetaDataIndexStateService.INDEX_CLOSED_BLOCK_ID);
}
public void testRenameOnRestore() throws Exception {

View File

@ -20,12 +20,10 @@ package org.elasticsearch.test;/*
import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
@ -63,6 +61,7 @@ public class BackgroundIndexer implements AutoCloseable {
final Semaphore availableBudget = new Semaphore(0);
final boolean useAutoGeneratedIDs;
private final Set<String> ids = ConcurrentCollections.newConcurrentSet();
private boolean assertNoFailuresOnStop = true;
volatile int minFieldSize = 10;
volatile int maxFieldSize = 140;
@ -163,13 +162,11 @@ public class BackgroundIndexer implements AutoCloseable {
}
BulkResponse bulkResponse = bulkRequest.get();
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (!bulkItemResponse.isFailed()) {
if (bulkItemResponse.isFailed() == false) {
boolean add = ids.add(bulkItemResponse.getId());
assert add : "ID: " + bulkItemResponse.getId() + " already used";
} else {
throw new ElasticsearchException("bulk request failure, id: ["
+ bulkItemResponse.getFailure().getId() + "] message: "
+ bulkItemResponse.getFailure().getMessage());
failures.add(bulkItemResponse.getFailure().getCause());
}
}
@ -283,8 +280,10 @@ public class BackgroundIndexer implements AutoCloseable {
}
stop.set(true);
Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true));
if (assertNoFailuresOnStop) {
assertNoFailures();
}
}
public long totalIndexedDocs() {
return ids.size();
@ -308,6 +307,10 @@ public class BackgroundIndexer implements AutoCloseable {
maxFieldSize = fieldSize;
}
public void setAssertNoFailuresOnStop(final boolean assertNoFailuresOnStop) {
this.assertNoFailuresOnStop = assertNoFailuresOnStop;
}
@Override
public void close() throws Exception {
stop();

View File

@ -129,7 +129,7 @@ public class ElasticsearchAssertions {
* @param builder the request builder
*/
public static void assertBlocked(ActionRequestBuilder builder) {
assertBlocked(builder, null);
assertBlocked(builder, (ClusterBlock) null);
}
/**
@ -155,9 +155,9 @@ public class ElasticsearchAssertions {
* Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}.
*
* @param builder the request builder
* @param expectedBlock the expected block
* @param expectedBlockId the expected block id
*/
public static void assertBlocked(ActionRequestBuilder builder, ClusterBlock expectedBlock) {
public static void assertBlocked(final ActionRequestBuilder builder, @Nullable final Integer expectedBlockId) {
try {
builder.get();
fail("Request executed with success but a ClusterBlockException was expected");
@ -165,19 +165,29 @@ public class ElasticsearchAssertions {
assertThat(e.blocks().size(), greaterThan(0));
assertThat(e.status(), equalTo(RestStatus.FORBIDDEN));
if (expectedBlock != null) {
if (expectedBlockId != null) {
boolean found = false;
for (ClusterBlock clusterBlock : e.blocks()) {
if (clusterBlock.id() == expectedBlock.id()) {
if (clusterBlock.id() == expectedBlockId) {
found = true;
break;
}
}
assertThat("Request should have been blocked by [" + expectedBlock + "] instead of " + e.blocks(), found, equalTo(true));
assertThat("Request should have been blocked by [" + expectedBlockId + "] instead of " + e.blocks(), found, equalTo(true));
}
}
}
/**
* Executes the request and fails if the request has not been blocked by a specific {@link ClusterBlock}.
*
* @param builder the request builder
* @param expectedBlock the expected block
*/
public static void assertBlocked(final ActionRequestBuilder builder, @Nullable final ClusterBlock expectedBlock) {
assertBlocked(builder, expectedBlock != null ? expectedBlock.id() : null);
}
public static String formatShardStatus(BroadcastResponse response) {
StringBuilder msg = new StringBuilder();
msg.append(" Total shards: ").append(response.getTotalShards())

View File

@ -5,11 +5,14 @@
*/
package org.elasticsearch.xpack.core.action;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.support.ActionFilters;
@ -54,17 +57,20 @@ public final class TransportFreezeIndexAction extends
private final DestructiveOperations destructiveOperations;
private final MetaDataIndexStateService indexStateService;
private final TransportCloseIndexAction transportCloseIndexAction;
@Inject
public TransportFreezeIndexAction(MetaDataIndexStateService indexStateService, TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
DestructiveOperations destructiveOperations) {
DestructiveOperations destructiveOperations,
TransportCloseIndexAction transportCloseIndexAction) {
super(FreezeIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
FreezeRequest::new);
this.destructiveOperations = destructiveOperations;
this.indexStateService = indexStateService;
this.transportCloseIndexAction = transportCloseIndexAction;
}
@Override
protected String executor() {
@ -103,11 +109,44 @@ public final class TransportFreezeIndexAction extends
@Override
protected void masterOperation(FreezeRequest request, ClusterState state, ActionListener<FreezeResponse> listener) {
throw new UnsupportedOperationException("The task parameter is required");
}
@Override
protected void masterOperation(Task task, TransportFreezeIndexAction.FreezeRequest request, ClusterState state,
ActionListener<TransportFreezeIndexAction.FreezeResponse> listener) throws Exception {
final Index[] concreteIndices = resolveIndices(request, state);
if (concreteIndices.length == 0) {
listener.onResponse(new FreezeResponse(true, true));
return;
}
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest(task.getId())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);
indexStateService.closeIndices(closeRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(final AcknowledgedResponse response) {
if (response.isAcknowledged()) {
toggleFrozenSettings(concreteIndices, request, listener);
} else {
// TODO improve FreezeResponse so that it also reports failures from the close index API
listener.onResponse(new FreezeResponse(false, false));
}
}
@Override
public void onFailure(final Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t);
}
});
}
private void toggleFrozenSettings(final Index[] concreteIndices, final FreezeRequest request,
final ActionListener<FreezeResponse> listener) {
clusterService.submitStateUpdateTask("toggle-frozen-settings",
new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.URGENT, request, new ActionListener<AcknowledgedResponse>() {
@Override
@ -136,14 +175,6 @@ public final class TransportFreezeIndexAction extends
}) {
@Override
public ClusterState execute(ClusterState currentState) {
List<Index> toClose = new ArrayList<>();
for (Index index : concreteIndices) {
IndexMetaData metaData = currentState.metaData().index(index);
if (metaData.getState() != IndexMetaData.State.CLOSE) {
toClose.add(index);
}
}
currentState = indexStateService.closeIndices(currentState, toClose.toArray(new Index[0]), toClose.toString());
final MetaData.Builder builder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
for (Index index : concreteIndices) {

View File

@ -61,7 +61,7 @@ public final class IndexPrivilege extends Privilege {
ClusterSearchShardsAction.NAME, TypesExistsAction.NAME, ValidateQueryAction.NAME + "*", GetSettingsAction.NAME,
ExplainLifecycleAction.NAME);
private static final Automaton MANAGE_FOLLOW_INDEX_AUTOMATON = patterns(PutFollowAction.NAME, UnfollowAction.NAME,
CloseIndexAction.NAME);
CloseIndexAction.NAME + "*");
private static final Automaton MANAGE_ILM_AUTOMATON = patterns("indices:admin/ilm/*");
public static final IndexPrivilege NONE = new IndexPrivilege("none", Automatons.EMPTY);