[Close Index API] Refactor MetaDataIndexStateService (#36354)

The commit changes how indices are closed in the MetaDataIndexStateService. 
It now uses a 3 steps process where writes are blocked on indices to be closed, 
then some verifications are done on shards using the TransportVerifyShardBeforeCloseAction 
added in #36249, and finally indices states are moved to CLOSE and their routing 
tables removed.

The closing process also takes care of using the pre-7.0 way to close indices if the 
cluster contains mixed version of nodes and a node does not support the TransportVerifyShardBeforeCloseAction. It also closes unassigned indices.

Related to #33888
This commit is contained in:
Tanguy Leroux 2018-12-13 17:36:23 +01:00 committed by GitHub
parent 40953d70ca
commit 8e5dd20efb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 1140 additions and 161 deletions

View File

@ -20,12 +20,13 @@
package org.elasticsearch.test.rest; package org.elasticsearch.test.rest;
import org.apache.http.util.EntityUtils; import org.apache.http.util.EntityUtils;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.action.ActionFuture; import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response; import org.elasticsearch.client.Response;
import org.elasticsearch.client.ResponseException; import org.elasticsearch.client.ResponseException;
import org.elasticsearch.client.ResponseListener; import org.elasticsearch.client.ResponseListener;
import org.elasticsearch.client.Request;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -40,6 +41,7 @@ import static org.hamcrest.Matchers.instanceOf;
/** /**
* Tests that wait for refresh is fired if the index is closed. * Tests that wait for refresh is fired if the index is closed.
*/ */
@LuceneTestCase.AwaitsFix(bugUrl = "to be created")
public class WaitForRefreshAndCloseIT extends ESRestTestCase { public class WaitForRefreshAndCloseIT extends ESRestTestCase {
@Before @Before
public void setupIndex() throws IOException { public void setupIndex() throws IOException {

View File

@ -25,7 +25,7 @@ import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
*/ */
public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> { public class CloseIndexClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<CloseIndexClusterStateUpdateRequest> {
CloseIndexClusterStateUpdateRequest() { public CloseIndexClusterStateUpdateRequest() {
} }
} }

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -105,19 +104,21 @@ public class TransportCloseIndexAction extends TransportMasterNodeAction<CloseIn
listener.onResponse(new AcknowledgedResponse(true)); listener.onResponse(new AcknowledgedResponse(true));
return; return;
} }
CloseIndexClusterStateUpdateRequest updateRequest = new CloseIndexClusterStateUpdateRequest()
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout()) final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest()
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices); .indices(concreteIndices);
indexStateService.closeIndices(updateRequest, new ActionListener<ClusterStateUpdateResponse>() { indexStateService.closeIndices(closeRequest, new ActionListener<AcknowledgedResponse>() {
@Override @Override
public void onResponse(ClusterStateUpdateResponse response) { public void onResponse(final AcknowledgedResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged())); listener.onResponse(response);
} }
@Override @Override
public void onFailure(Exception t) { public void onFailure(final Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t); logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t); listener.onFailure(t);
} }

View File

@ -40,10 +40,10 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.TransportService;
public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction< public class TransportVerifyShardBeforeCloseAction extends TransportReplicationAction<
TransportVerifyShardBeforeCloseAction.ShardCloseRequest, TransportVerifyShardBeforeCloseAction.ShardCloseRequest, ReplicationResponse> { TransportVerifyShardBeforeCloseAction.ShardRequest, TransportVerifyShardBeforeCloseAction.ShardRequest, ReplicationResponse> {
public static final String NAME = CloseIndexAction.NAME + "[s]"; public static final String NAME = CloseIndexAction.NAME + "[s]";
private static final ClusterBlock EXPECTED_BLOCK = MetaDataIndexStateService.INDEX_CLOSED_BLOCK; public static final ClusterBlock EXPECTED_BLOCK = MetaDataIndexStateService.INDEX_CLOSED_BLOCK;
@Inject @Inject
public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService, public TransportVerifyShardBeforeCloseAction(final Settings settings, final TransportService transportService,
@ -51,7 +51,7 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
final ThreadPool threadPool, final ShardStateAction stateAction, final ThreadPool threadPool, final ShardStateAction stateAction,
final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) { final ActionFilters actionFilters, final IndexNameExpressionResolver resolver) {
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver, super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters, resolver,
ShardCloseRequest::new, ShardCloseRequest::new, ThreadPool.Names.MANAGEMENT); ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
} }
@Override @Override
@ -61,14 +61,14 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
@Override @Override
protected void acquirePrimaryOperationPermit(final IndexShard primary, protected void acquirePrimaryOperationPermit(final IndexShard primary,
final ShardCloseRequest request, final ShardRequest request,
final ActionListener<Releasable> onAcquired) { final ActionListener<Releasable> onAcquired) {
primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout()); primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout());
} }
@Override @Override
protected void acquireReplicaOperationPermit(final IndexShard replica, protected void acquireReplicaOperationPermit(final IndexShard replica,
final ShardCloseRequest request, final ShardRequest request,
final ActionListener<Releasable> onAcquired, final ActionListener<Releasable> onAcquired,
final long primaryTerm, final long primaryTerm,
final long globalCheckpoint, final long globalCheckpoint,
@ -77,14 +77,14 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
} }
@Override @Override
protected PrimaryResult<ShardCloseRequest, ReplicationResponse> shardOperationOnPrimary(final ShardCloseRequest shardRequest, protected PrimaryResult<ShardRequest, ReplicationResponse> shardOperationOnPrimary(final ShardRequest shardRequest,
final IndexShard primary) throws Exception { final IndexShard primary) throws Exception {
executeShardOperation(primary); executeShardOperation(primary);
return new PrimaryResult<>(shardRequest, new ReplicationResponse()); return new PrimaryResult<>(shardRequest, new ReplicationResponse());
} }
@Override @Override
protected ReplicaResult shardOperationOnReplica(final ShardCloseRequest shardRequest, final IndexShard replica) throws Exception { protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception {
executeShardOperation(replica); executeShardOperation(replica);
return new ReplicaResult(); return new ReplicaResult();
} }
@ -109,18 +109,18 @@ public class TransportVerifyShardBeforeCloseAction extends TransportReplicationA
logger.debug("{} shard is ready for closing", shardId); logger.debug("{} shard is ready for closing", shardId);
} }
public static class ShardCloseRequest extends ReplicationRequest<ShardCloseRequest> { public static class ShardRequest extends ReplicationRequest<ShardRequest> {
ShardCloseRequest(){ ShardRequest(){
} }
public ShardCloseRequest(final ShardId shardId) { public ShardRequest(final ShardId shardId) {
super(shardId); super(shardId);
} }
@Override @Override
public String toString() { public String toString() {
return "close shard {" + shardId + "}"; return "verify shard before close {" + shardId + "}";
} }
} }
} }

View File

@ -19,28 +19,46 @@
package org.elasticsearch.cluster.metadata; package org.elasticsearch.cluster.metadata;
import com.carrotsearch.hppc.cursors.IntObjectCursor;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.NotifyOnceListener;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
import org.elasticsearch.action.support.ActiveShardsObserver; import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse; import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse; import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse;
import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority; import org.elasticsearch.common.Priority;
import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.indices.IndicesService; import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.RestoreService; import org.elasticsearch.snapshots.RestoreService;
@ -51,8 +69,13 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import static java.util.Collections.unmodifiableMap;
/** /**
* Service responsible for submitting open/close index requests * Service responsible for submitting open/close index requests
@ -64,50 +87,118 @@ public class MetaDataIndexStateService {
false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE); false, false, RestStatus.FORBIDDEN, ClusterBlockLevel.READ_WRITE);
private final ClusterService clusterService; private final ClusterService clusterService;
private final AllocationService allocationService; private final AllocationService allocationService;
private final MetaDataIndexUpgradeService metaDataIndexUpgradeService; private final MetaDataIndexUpgradeService metaDataIndexUpgradeService;
private final IndicesService indicesService; private final IndicesService indicesService;
private final ThreadPool threadPool;
private final TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction;
private final ActiveShardsObserver activeShardsObserver; private final ActiveShardsObserver activeShardsObserver;
@Inject @Inject
public MetaDataIndexStateService(ClusterService clusterService, AllocationService allocationService, public MetaDataIndexStateService(ClusterService clusterService, AllocationService allocationService,
MetaDataIndexUpgradeService metaDataIndexUpgradeService, MetaDataIndexUpgradeService metaDataIndexUpgradeService,
IndicesService indicesService, ThreadPool threadPool) { IndicesService indicesService, ThreadPool threadPool,
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction) {
this.indicesService = indicesService; this.indicesService = indicesService;
this.clusterService = clusterService; this.clusterService = clusterService;
this.allocationService = allocationService; this.allocationService = allocationService;
this.threadPool = threadPool;
this.transportVerifyShardBeforeCloseAction = transportVerifyShardBeforeCloseAction;
this.metaDataIndexUpgradeService = metaDataIndexUpgradeService; this.metaDataIndexUpgradeService = metaDataIndexUpgradeService;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool); this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
} }
public void closeIndices(final CloseIndexClusterStateUpdateRequest request, final ActionListener<ClusterStateUpdateResponse> listener) { /**
if (request.indices() == null || request.indices().length == 0) { * Closes one or more indices.
*
* Closing indices is a 3 steps process: it first adds a write block to every indices to close, then waits for the operations on shards
* to be terminated and finally closes the indices by moving their state to CLOSE.
*/
public void closeIndices(final CloseIndexClusterStateUpdateRequest request, final ActionListener<AcknowledgedResponse> listener) {
final Index[] concreteIndices = request.indices();
if (concreteIndices == null || concreteIndices.length == 0) {
throw new IllegalArgumentException("Index name is required"); throw new IllegalArgumentException("Index name is required");
} }
final String indicesAsString = Arrays.toString(request.indices()); final TimeValue timeout = request.ackTimeout();
clusterService.submitStateUpdateTask("close-indices " + indicesAsString, final TimeValue masterTimeout = request.masterNodeTimeout();
new AckedClusterStateUpdateTask<ClusterStateUpdateResponse>(Priority.URGENT, request, listener) {
clusterService.submitStateUpdateTask("add-block-index-to-close " + Arrays.toString(concreteIndices),
new ClusterStateUpdateTask(Priority.URGENT) {
private final Set<Index> blockedIndices = new HashSet<>();
@Override @Override
protected ClusterStateUpdateResponse newResponse(boolean acknowledged) { public ClusterState execute(final ClusterState currentState) {
return new ClusterStateUpdateResponse(acknowledged); return addIndexClosedBlocks(concreteIndices, currentState, blockedIndices);
} }
@Override @Override
public ClusterState execute(ClusterState currentState) { public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
return closeIndices(currentState, request.indices(), indicesAsString); if (oldState == newState) {
} assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed";
}); listener.onResponse(new AcknowledgedResponse(true));
} else {
assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed";
threadPool.executor(ThreadPool.Names.MANAGEMENT)
.execute(new WaitForClosedBlocksApplied(blockedIndices, timeout,
ActionListener.wrap(closedBlocksResults ->
clusterService.submitStateUpdateTask("close-indices", new ClusterStateUpdateTask(Priority.URGENT) {
@Override
public ClusterState execute(final ClusterState currentState) throws Exception {
final ClusterState updatedState = closeRoutingTable(currentState, closedBlocksResults);
return allocationService.reroute(updatedState, "indices closed");
} }
public ClusterState closeIndices(ClusterState currentState, final Index[] indices, String indicesAsString) { @Override
Set<IndexMetaData> indicesToClose = new HashSet<>(); public void onFailure(final String source, final Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(final String source,
final ClusterState oldState, final ClusterState newState) {
boolean acknowledged = closedBlocksResults.values().stream()
.allMatch(AcknowledgedResponse::isAcknowledged);
listener.onResponse(new AcknowledgedResponse(acknowledged));
}
}),
listener::onFailure)
)
);
}
}
@Override
public void onFailure(final String source, final Exception e) {
listener.onFailure(e);
}
@Override
public TimeValue timeout() {
return masterTimeout;
}
}
);
}
/**
* Step 1 - Start closing indices by adding a write block
*
* This step builds the list of indices to close (the ones explicitly requested that are not in CLOSE state) and adds the index block
* {@link #INDEX_CLOSED_BLOCK} to every index to close in the cluster state. After the cluster state is published, the shards should
* start to reject writing operations and we can proceed with step 2.
*/
static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterState currentState, final Set<Index> blockedIndices) {
final MetaData.Builder metadata = MetaData.builder(currentState.metaData());
final Set<IndexMetaData> indicesToClose = new HashSet<>();
for (Index index : indices) { for (Index index : indices) {
final IndexMetaData indexMetaData = currentState.metaData().getIndexSafe(index); final IndexMetaData indexMetaData = metadata.getSafe(index);
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) { if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
indicesToClose.add(indexMetaData); indicesToClose.add(indexMetaData);
} else {
logger.debug("index {} is already closed, ignoring", index);
} }
} }
@ -119,28 +210,174 @@ public class MetaDataIndexStateService {
RestoreService.checkIndexClosing(currentState, indicesToClose); RestoreService.checkIndexClosing(currentState, indicesToClose);
// Check if index closing conflicts with any running snapshots // Check if index closing conflicts with any running snapshots
SnapshotsService.checkIndexClosing(currentState, indicesToClose); SnapshotsService.checkIndexClosing(currentState, indicesToClose);
logger.info("closing indices [{}]", indicesAsString);
MetaData.Builder mdBuilder = MetaData.builder(currentState.metaData()); // If the cluster is in a mixed version that does not support the shard close action,
ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder() // we use the previous way to close indices and directly close them without sanity checks
.blocks(currentState.blocks()); final boolean useDirectClose = currentState.nodes().getMinNodeVersion().before(Version.V_7_0_0);
for (IndexMetaData openIndexMetadata : indicesToClose) {
final String indexName = openIndexMetadata.getIndex().getName(); final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
mdBuilder.put(IndexMetaData.builder(openIndexMetadata).state(IndexMetaData.State.CLOSE)); final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
blocksBuilder.addIndexBlock(indexName, INDEX_CLOSED_BLOCK);
for (IndexMetaData indexToClose : indicesToClose) {
final Index index = indexToClose.getIndex();
if (currentState.blocks().hasIndexBlock(index.getName(), INDEX_CLOSED_BLOCK) == false) {
blocks.addIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
}
if (useDirectClose) {
logger.debug("closing index {} directly", index);
metadata.put(IndexMetaData.builder(indexToClose).state(IndexMetaData.State.CLOSE));
routingTable.remove(index.getName());
}
blockedIndices.add(index);
} }
ClusterState updatedState = ClusterState.builder(currentState).metaData(mdBuilder).blocks(blocksBuilder).build(); logger.info(() -> new ParameterizedMessage("closing indices {}",
blockedIndices.stream().map(Object::toString).collect(Collectors.joining(","))));
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable()); return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
for (IndexMetaData index : indicesToClose) {
rtBuilder.remove(index.getIndex().getName());
} }
//no explicit wait for other nodes needed as we use AckedClusterStateUpdateTask /**
return allocationService.reroute( * Step 2 - Wait for indices to be ready for closing
ClusterState.builder(updatedState).routingTable(rtBuilder.build()).build(), * <p>
"indices closed [" + indicesAsString + "]"); * This step iterates over the indices previously blocked and sends a {@link TransportVerifyShardBeforeCloseAction} to each shard. If
* this action succeed then the shard is considered to be ready for closing. When all shards of a given index are ready for closing,
* the index is considered ready to be closed.
*/
class WaitForClosedBlocksApplied extends AbstractRunnable {
private final Set<Index> blockedIndices;
private final @Nullable TimeValue timeout;
private final ActionListener<Map<Index, AcknowledgedResponse>> listener;
private WaitForClosedBlocksApplied(final Set<Index> blockedIndices,
final @Nullable TimeValue timeout,
final ActionListener<Map<Index, AcknowledgedResponse>> listener) {
if (blockedIndices == null || blockedIndices.isEmpty()) {
throw new IllegalArgumentException("Cannot wait for closed block to be applied to null or empty list of blocked indices");
}
this.blockedIndices = blockedIndices;
this.listener = listener;
this.timeout = timeout;
}
@Override
public void onFailure(final Exception e) {
listener.onFailure(e);
}
@Override
protected void doRun() throws Exception {
final Map<Index, AcknowledgedResponse> results = ConcurrentCollections.newConcurrentMap();
final CountDown countDown = new CountDown(blockedIndices.size());
final ClusterState state = clusterService.state();
for (Index blockedIndex : blockedIndices) {
waitForShardsReadyForClosing(blockedIndex, state, timeout, response -> {
results.put(blockedIndex, response);
if (countDown.countDown()) {
listener.onResponse(unmodifiableMap(results));
}
});
}
}
private void waitForShardsReadyForClosing(final Index index, final ClusterState state, @Nullable final TimeValue timeout,
final Consumer<AcknowledgedResponse> onResponse) {
final IndexMetaData indexMetaData = state.metaData().index(index);
if (indexMetaData == null) {
logger.debug("index {} has been blocked before closing and is now deleted, ignoring", index);
onResponse.accept(new AcknowledgedResponse(true));
return;
}
final IndexRoutingTable indexRoutingTable = state.routingTable().index(index);
if (indexRoutingTable == null || indexMetaData.getState() == IndexMetaData.State.CLOSE) {
logger.debug("index {} has been blocked before closing and is already closed, ignoring", index);
onResponse.accept(new AcknowledgedResponse(true));
return;
}
final ImmutableOpenIntMap<IndexShardRoutingTable> shards = indexRoutingTable.getShards();
final AtomicArray<AcknowledgedResponse> results = new AtomicArray<>(shards.size());
final CountDown countDown = new CountDown(shards.size());
for (IntObjectCursor<IndexShardRoutingTable> shard : shards) {
final IndexShardRoutingTable shardRoutingTable = shard.value;
final ShardId shardId = shardRoutingTable.shardId();
sendVerifyShardBeforeCloseRequest(shardRoutingTable, timeout, new NotifyOnceListener<ReplicationResponse>() {
@Override
public void innerOnResponse(final ReplicationResponse replicationResponse) {
ReplicationResponse.ShardInfo shardInfo = replicationResponse.getShardInfo();
results.setOnce(shardId.id(), new AcknowledgedResponse(shardInfo.getFailed() == 0));
processIfFinished();
}
@Override
public void innerOnFailure(final Exception e) {
results.setOnce(shardId.id(), new AcknowledgedResponse(false));
processIfFinished();
}
private void processIfFinished() {
if (countDown.countDown()) {
final boolean acknowledged = results.asList().stream().allMatch(AcknowledgedResponse::isAcknowledged);
onResponse.accept(new AcknowledgedResponse(acknowledged));
}
}
});
}
}
private void sendVerifyShardBeforeCloseRequest(final IndexShardRoutingTable shardRoutingTable, @Nullable final TimeValue timeout,
final ActionListener<ReplicationResponse> listener) {
final ShardId shardId = shardRoutingTable.shardId();
if (shardRoutingTable.primaryShard().unassigned()) {
logger.debug("primary shard {} is unassigned, ignoring", shardId);
final ReplicationResponse response = new ReplicationResponse();
response.setShardInfo(new ReplicationResponse.ShardInfo(shardRoutingTable.size(), shardRoutingTable.size()));
listener.onResponse(response);
return;
}
final TransportVerifyShardBeforeCloseAction.ShardRequest shardRequest =
new TransportVerifyShardBeforeCloseAction.ShardRequest(shardId);
if (timeout != null) {
shardRequest.timeout(timeout);
}
// TODO propagate a task id from the parent CloseIndexRequest to the ShardCloseRequests
transportVerifyShardBeforeCloseAction.execute(shardRequest, listener);
}
}
/**
* Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing.
*/
static ClusterState closeRoutingTable(final ClusterState currentState, final Map<Index, AcknowledgedResponse> results) {
final MetaData.Builder metadata = MetaData.builder(currentState.metaData());
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
final Set<String> closedIndices = new HashSet<>();
for (Map.Entry<Index, AcknowledgedResponse> result : results.entrySet()) {
final Index index = result.getKey();
try {
final IndexMetaData indexMetaData = metadata.getSafe(index);
if (indexMetaData.getState() != IndexMetaData.State.CLOSE) {
if (result.getValue().isAcknowledged()) {
logger.debug("closing index {} succeed, removing index routing table", index);
metadata.put(IndexMetaData.builder(indexMetaData).state(IndexMetaData.State.CLOSE));
routingTable.remove(index.getName());
closedIndices.add(index.getName());
} else {
logger.debug("closing index {} failed, removing index block because: {}", index, result.getValue());
blocks.removeIndexBlock(index.getName(), INDEX_CLOSED_BLOCK);
}
} else {
logger.debug("index {} has been closed since it was blocked before closing, ignoring", index);
}
} catch (final IndexNotFoundException e) {
logger.debug("index {} has been deleted since it was blocked before closing, ignoring", index);
}
}
logger.info("completed closing of indices {}", closedIndices);
return ClusterState.builder(currentState).blocks(blocks).metaData(metadata).routingTable(routingTable.build()).build();
} }
public void openIndex(final OpenIndexClusterStateUpdateRequest request, public void openIndex(final OpenIndexClusterStateUpdateRequest request,
@ -250,7 +487,6 @@ public class MetaDataIndexStateService {
ex.addValidationError(error.get()); ex.addValidationError(error.get());
throw ex; throw ex;
} }
} }
private static int getTotalShardCount(ClusterState state, Index index) { private static int getTotalShardCount(ClusterState state, Index index) {

View File

@ -74,8 +74,8 @@ public class TransportVerifyShardBeforeCloseActionTests extends ESTestCase {
} }
private void executeOnPrimaryOrReplica() throws Exception { private void executeOnPrimaryOrReplica() throws Exception {
final TransportVerifyShardBeforeCloseAction.ShardCloseRequest request = final TransportVerifyShardBeforeCloseAction.ShardRequest request =
new TransportVerifyShardBeforeCloseAction.ShardCloseRequest(indexShard.shardId()); new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId());
if (randomBoolean()) { if (randomBoolean()) {
assertNotNull(action.shardOperationOnPrimary(request, indexShard)); assertNotNull(action.shardOperationOnPrimary(request, indexShard));
} else { } else {

View File

@ -20,27 +20,186 @@
package org.elasticsearch.cluster.metadata; package org.elasticsearch.cluster.metadata;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.shards.ClusterShardLimitIT; import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ValidationException; import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS;
import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_VERSION_CREATED;
import static org.elasticsearch.cluster.routing.TestShardRouting.newShardRouting;
import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount; import static org.elasticsearch.cluster.shards.ClusterShardLimitIT.ShardCounts.forDataNodeCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class MetaDataIndexStateServiceTests extends ESTestCase { public class MetaDataIndexStateServiceTests extends ESTestCase {
public void testCloseRoutingTable() {
final Set<Index> nonBlockedIndices = new HashSet<>();
final Map<Index, AcknowledgedResponse> blockedIndices = new HashMap<>();
ClusterState state = ClusterState.builder(new ClusterName("testCloseRoutingTable")).build();
for (int i = 0; i < randomIntBetween(1, 25); i++) {
final String indexName = randomAlphaOfLengthBetween(5, 15);
if (randomBoolean()) {
state = addOpenedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
nonBlockedIndices.add(state.metaData().index(indexName).getIndex());
} else {
state = addBlockedIndex(indexName, randomIntBetween(1, 5), randomIntBetween(0, 5), state);
blockedIndices.put(state.metaData().index(indexName).getIndex(), new AcknowledgedResponse(randomBoolean()));
}
}
final ClusterState updatedState = MetaDataIndexStateService.closeRoutingTable(state, blockedIndices);
assertThat(updatedState.metaData().indices().size(), equalTo(nonBlockedIndices.size() + blockedIndices.size()));
for (Index nonBlockedIndex : nonBlockedIndices) {
assertIsOpened(nonBlockedIndex.getName(), updatedState);
}
for (Map.Entry<Index, AcknowledgedResponse> blockedIndex : blockedIndices.entrySet()) {
if (blockedIndex.getValue().isAcknowledged()) {
assertIsClosed(blockedIndex.getKey().getName(), updatedState);
} else {
assertIsOpened(blockedIndex.getKey().getName(), updatedState);
}
}
}
public void testAddIndexClosedBlocks() {
final ClusterState initialState = ClusterState.builder(new ClusterName("testAddIndexClosedBlocks")).build();
{
final Set<Index> blockedIndices = new HashSet<>();
expectThrows(IndexNotFoundException.class, () ->
MetaDataIndexStateService.addIndexClosedBlocks(new Index[]{new Index("_name", "_uid")}, initialState, blockedIndices));
assertTrue(blockedIndices.isEmpty());
}
{
final Set<Index> blockedIndices = new HashSet<>();
Index[] indices = Index.EMPTY_ARRAY;
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, initialState, blockedIndices);
assertSame(initialState, updatedState);
assertTrue(blockedIndices.isEmpty());
}
{
final Set<Index> blockedIndices = new HashSet<>();
ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
Index[] indices = new Index[]{state.metaData().index("closed").getIndex()};
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices);
assertSame(state, updatedState);
assertTrue(blockedIndices.isEmpty());
}
{
final Set<Index> blockedIndices = new HashSet<>();
ClusterState state = addClosedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
state = addOpenedIndex("opened", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
Index[] indices = new Index[]{state.metaData().index("opened").getIndex(), state.metaData().index("closed").getIndex()};
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices);
assertNotSame(state, updatedState);
assertTrue(blockedIndices.contains(updatedState.metaData().index("opened").getIndex()));
assertFalse(blockedIndices.contains(updatedState.metaData().index("closed").getIndex()));
assertIsBlocked("opened", updatedState, true);
}
{
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
ClusterState state = addRestoredIndex("restored", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
if (randomBoolean()) {
state = addOpenedIndex("opened", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
}
if (randomBoolean()) {
state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
}
Index[] indices = new Index[]{state.metaData().index("restored").getIndex()};
MetaDataIndexStateService.addIndexClosedBlocks(indices, state, new HashSet<>());
});
assertThat(exception.getMessage(), containsString("Cannot close indices that are being restored: [[restored]]"));
}
{
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> {
ClusterState state = addSnapshotIndex("snapshotted", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
if (randomBoolean()) {
state = addOpenedIndex("opened", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
}
if (randomBoolean()) {
state = addOpenedIndex("closed", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
}
Index[] indices = new Index[]{state.metaData().index("snapshotted").getIndex()};
MetaDataIndexStateService.addIndexClosedBlocks(indices, state, new HashSet<>());
});
assertThat(exception.getMessage(), containsString("Cannot close indices that are being snapshotted: [[snapshotted]]"));
}
{
final Set<Index> blockedIndices = new HashSet<>();
ClusterState state = addOpenedIndex("index-1", randomIntBetween(1, 3), randomIntBetween(0, 3), initialState);
state = addOpenedIndex("index-2", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
state = addOpenedIndex("index-3", randomIntBetween(1, 3), randomIntBetween(0, 3), state);
final boolean mixedVersions = randomBoolean();
if (mixedVersions) {
state = ClusterState.builder(state)
.nodes(DiscoveryNodes.builder(state.nodes())
.add(new DiscoveryNode("old_node", buildNewFakeTransportAddress(), Collections.emptyMap(),
new HashSet<>(Arrays.asList(DiscoveryNode.Role.values())), Version.V_6_0_0)))
.build();
}
Index[] indices = new Index[]{state.metaData().index("index-1").getIndex(),
state.metaData().index("index-2").getIndex(), state.metaData().index("index-3").getIndex()};
ClusterState updatedState = MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices);
assertNotSame(state, updatedState);
assertTrue(blockedIndices.contains(updatedState.metaData().index("index-1").getIndex()));
assertTrue(blockedIndices.contains(updatedState.metaData().index("index-2").getIndex()));
assertTrue(blockedIndices.contains(updatedState.metaData().index("index-3").getIndex()));
if (mixedVersions) {
assertIsClosed("index-1", updatedState);
assertIsClosed("index-2", updatedState);
assertIsClosed("index-2", updatedState);
} else {
assertIsBlocked("index-1", updatedState, true);
assertIsBlocked("index-2", updatedState, true);
assertIsBlocked("index-3", updatedState, true);
}
}
}
public void testValidateShardLimit() { public void testValidateShardLimit() {
int nodesInCluster = randomIntBetween(2,100); int nodesInCluster = randomIntBetween(2,100);
ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster); ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
@ -55,7 +214,6 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
.collect(Collectors.toList()) .collect(Collectors.toList())
.toArray(new Index[2]); .toArray(new Index[2]);
DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas()); int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas()); int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
int maxShards = counts.getShardsPerNode() * nodesInCluster; int maxShards = counts.getShardsPerNode() * nodesInCluster;
@ -74,27 +232,110 @@ public class MetaDataIndexStateServiceTests extends ESTestCase {
DiscoveryNodes nodes = mock(DiscoveryNodes.class); DiscoveryNodes nodes = mock(DiscoveryNodes.class);
when(nodes.getDataNodes()).thenReturn(dataNodes.build()); when(nodes.getDataNodes()).thenReturn(dataNodes.build());
IndexMetaData.Builder openIndexMetaData = IndexMetaData.builder(randomAlphaOfLengthBetween(5, 15)) ClusterState state = ClusterState.builder(ClusterName.DEFAULT).build();
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT)) state = addOpenedIndex(randomAlphaOfLengthBetween(5, 15), openIndexShards, openIndexReplicas, state);
.creationDate(randomLong()) state = addClosedIndex(randomAlphaOfLengthBetween(5, 15), closedIndexShards, closedIndexReplicas, state);
.numberOfShards(openIndexShards)
.numberOfReplicas(openIndexReplicas); final MetaData.Builder metaData = MetaData.builder(state.metaData());
IndexMetaData.Builder closedIndexMetaData = IndexMetaData.builder(randomAlphaOfLengthBetween(5, 15))
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.creationDate(randomLong())
.state(IndexMetaData.State.CLOSE)
.numberOfShards(closedIndexShards)
.numberOfReplicas(closedIndexReplicas);
MetaData.Builder metaData = MetaData.builder().put(openIndexMetaData).put(closedIndexMetaData);
if (randomBoolean()) { if (randomBoolean()) {
metaData.persistentSettings(clusterSettings); metaData.persistentSettings(clusterSettings);
} else { } else {
metaData.transientSettings(clusterSettings); metaData.transientSettings(clusterSettings);
} }
return ClusterState.builder(state).metaData(metaData).nodes(nodes).build();
}
return ClusterState.builder(ClusterName.DEFAULT) private static ClusterState addOpenedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
.metaData(metaData) return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.OPEN, null);
.nodes(nodes) }
private static ClusterState addClosedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.CLOSE, MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
}
private static ClusterState addBlockedIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
return addIndex(state, index, numShards, numReplicas, IndexMetaData.State.OPEN, MetaDataIndexStateService.INDEX_CLOSED_BLOCK);
}
private static ClusterState addRestoredIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
ClusterState newState = addOpenedIndex(index, numShards, numReplicas, state);
final ImmutableOpenMap.Builder<ShardId, RestoreInProgress.ShardRestoreStatus> shardsBuilder = ImmutableOpenMap.builder();
for (ShardRouting shardRouting : newState.routingTable().index(index).randomAllActiveShardsIt()) {
shardsBuilder.put(shardRouting.shardId(), new RestoreInProgress.ShardRestoreStatus(shardRouting.currentNodeId()));
}
final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)));
final RestoreInProgress.Entry entry =
new RestoreInProgress.Entry(snapshot, RestoreInProgress.State.INIT, Collections.singletonList(index), shardsBuilder.build());
return ClusterState.builder(newState).putCustom(RestoreInProgress.TYPE, new RestoreInProgress(entry)).build();
}
private static ClusterState addSnapshotIndex(final String index, final int numShards, final int numReplicas, final ClusterState state) {
ClusterState newState = addOpenedIndex(index, numShards, numReplicas, state);
final ImmutableOpenMap.Builder<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
for (ShardRouting shardRouting : newState.routingTable().index(index).randomAllActiveShardsIt()) {
shardsBuilder.put(shardRouting.shardId(), new SnapshotsInProgress.ShardSnapshotStatus(shardRouting.currentNodeId()));
}
final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)));
final SnapshotsInProgress.Entry entry =
new SnapshotsInProgress.Entry(snapshot, randomBoolean(), false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build());
return ClusterState.builder(newState).putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(entry)).build();
}
private static ClusterState addIndex(final ClusterState currentState,
final String index,
final int numShards,
final int numReplicas,
final IndexMetaData.State state,
@Nullable final ClusterBlock block) {
final IndexMetaData indexMetaData = IndexMetaData.builder(index)
.state(state)
.creationDate(randomNonNegativeLong())
.settings(Settings.builder()
.put(SETTING_VERSION_CREATED, Version.CURRENT)
.put(SETTING_NUMBER_OF_SHARDS, numShards)
.put(SETTING_NUMBER_OF_REPLICAS, numReplicas))
.build(); .build();
final ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState);
clusterStateBuilder.metaData(MetaData.builder(currentState.metaData()).put(indexMetaData, true));
if (state == IndexMetaData.State.OPEN) {
final IndexRoutingTable.Builder indexRoutingTable = IndexRoutingTable.builder(indexMetaData.getIndex());
for (int j = 0; j < indexMetaData.getNumberOfShards(); j++) {
ShardId shardId = new ShardId(indexMetaData.getIndex(), j);
IndexShardRoutingTable.Builder indexShardRoutingBuilder = new IndexShardRoutingTable.Builder(shardId);
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), true, ShardRoutingState.STARTED));
for (int k = 0; k < indexMetaData.getNumberOfReplicas(); k++) {
indexShardRoutingBuilder.addShard(newShardRouting(shardId, randomAlphaOfLength(10), false, ShardRoutingState.STARTED));
}
indexRoutingTable.addIndexShard(indexShardRoutingBuilder.build());
}
clusterStateBuilder.routingTable(RoutingTable.builder(currentState.routingTable()).add(indexRoutingTable).build());
}
if (block != null) {
clusterStateBuilder.blocks(ClusterBlocks.builder().blocks(currentState.blocks()).addIndexBlock(index, block));
}
return clusterStateBuilder.build();
}
private static void assertIsOpened(final String indexName, final ClusterState clusterState) {
assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.OPEN));
assertThat(clusterState.routingTable().index(indexName), notNullValue());
assertIsBlocked(indexName, clusterState, false);
}
private static void assertIsClosed(final String indexName, final ClusterState clusterState) {
assertThat(clusterState.metaData().index(indexName).getState(), is(IndexMetaData.State.CLOSE));
assertThat(clusterState.routingTable().index(indexName), nullValue());
assertIsBlocked(indexName, clusterState, true);
}
private static void assertIsBlocked(final String indexName, final ClusterState clusterState, final boolean blocked) {
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(blocked));
} }
} }

View File

@ -0,0 +1,46 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.cluster.metadata;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.index.Index;
import java.util.Map;
import java.util.Set;
public class MetaDataIndexStateServiceUtils {
private MetaDataIndexStateServiceUtils(){
}
/**
* Allows to call {@link MetaDataIndexStateService#addIndexClosedBlocks(Index[], ClusterState, Set)} which is a protected method.
*/
public static ClusterState addIndexClosedBlocks(final Index[] indices, final ClusterState state, final Set<Index> blockedIndices) {
return MetaDataIndexStateService.addIndexClosedBlocks(indices, state, blockedIndices);
}
/**
* Allows to call {@link MetaDataIndexStateService#closeRoutingTable(ClusterState, Map)} which is a protected method.
*/
public static ClusterState closeRoutingTable(final ClusterState state, final Map<Index, AcknowledgedResponse> results) {
return MetaDataIndexStateService.closeRoutingTable(state, results);
}
}

View File

@ -28,6 +28,7 @@ import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction; import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction; import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@ -39,6 +40,7 @@ import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeAction; import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils; import org.elasticsearch.action.support.master.TransportMasterNodeActionUtils;
@ -58,6 +60,7 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService; import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateServiceUtils;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService; import org.elasticsearch.cluster.metadata.MetaDataUpdateSettingsService;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
@ -77,6 +80,7 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.IndexEventListener; import org.elasticsearch.index.shard.IndexEventListener;
@ -92,6 +96,8 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom; import static com.carrotsearch.randomizedtesting.RandomizedTest.getRandom;
@ -179,8 +185,11 @@ public class ClusterStateChanges {
return indexMetaData; return indexMetaData;
} }
}; };
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction = new TransportVerifyShardBeforeCloseAction(SETTINGS,
transportService, clusterService, indicesService, threadPool, null, actionFilters, indexNameExpressionResolver);
MetaDataIndexStateService indexStateService = new MetaDataIndexStateService(clusterService, allocationService, MetaDataIndexStateService indexStateService = new MetaDataIndexStateService(clusterService, allocationService,
metaDataIndexUpgradeService, indicesService, threadPool); metaDataIndexUpgradeService, indicesService, threadPool, transportVerifyShardBeforeCloseAction);
MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(SETTINGS, clusterService, allocationService); MetaDataDeleteIndexService deleteIndexService = new MetaDataDeleteIndexService(SETTINGS, clusterService, allocationService);
MetaDataUpdateSettingsService metaDataUpdateSettingsService = new MetaDataUpdateSettingsService(clusterService, MetaDataUpdateSettingsService metaDataUpdateSettingsService = new MetaDataUpdateSettingsService(clusterService,
allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, threadPool); allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, threadPool);
@ -210,7 +219,15 @@ public class ClusterStateChanges {
} }
public ClusterState closeIndices(ClusterState state, CloseIndexRequest request) { public ClusterState closeIndices(ClusterState state, CloseIndexRequest request) {
return execute(transportCloseIndexAction, request, state); final Index[] concreteIndices = Arrays.stream(request.indices())
.map(index -> state.metaData().index(index).getIndex()).toArray(Index[]::new);
final Set<Index> blockedIndices = new HashSet<>();
ClusterState newState = MetaDataIndexStateServiceUtils.addIndexClosedBlocks(concreteIndices, state, blockedIndices);
newState = MetaDataIndexStateServiceUtils.closeRoutingTable(newState, blockedIndices.stream()
.collect(Collectors.toMap(Function.identity(), index -> new AcknowledgedResponse(true))));
return allocationService.reroute(newState, "indices closed");
} }
public ClusterState openIndices(ClusterState state, OpenIndexRequest request) { public ClusterState openIndices(ClusterState state, OpenIndexRequest request) {

View File

@ -0,0 +1,273 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.state;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaDataIndexStateService;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.CountDownLatch;
import java.util.stream.IntStream;
import static java.util.stream.Collectors.toList;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItem;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
public class CloseIndexIT extends ESIntegTestCase {
public void testCloseMissingIndex() {
IndexNotFoundException e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices().prepareClose("test").get());
assertThat(e.getMessage(), is("no such index [test]"));
}
public void testCloseOneMissingIndex() {
createIndex("test1");
final IndexNotFoundException e = expectThrows(IndexNotFoundException.class,
() -> client().admin().indices().prepareClose("test1", "test2").get());
assertThat(e.getMessage(), is("no such index [test2]"));
}
public void testCloseOneMissingIndexIgnoreMissing() {
createIndex("test1");
assertAcked(client().admin().indices().prepareClose("test1", "test2").setIndicesOptions(IndicesOptions.lenientExpandOpen()));
assertIndexIsClosed("test1");
}
public void testCloseNoIndex() {
final ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class,
() -> client().admin().indices().prepareClose().get());
assertThat(e.getMessage(), containsString("index is missing"));
}
public void testCloseNullIndex() {
final ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class,
() -> client().admin().indices().prepareClose((String[])null).get());
assertThat(e.getMessage(), containsString("index is missing"));
}
public void testCloseIndex() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
final int nbDocs = randomIntBetween(0, 50);
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs)
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
assertAcked(client().admin().indices().prepareOpen(indexName));
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs);
}
public void testCloseAlreadyClosedIndex() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
if (randomBoolean()) {
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(1, 10))
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
}
// First close should be acked
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
// Second close should be acked too
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
}
public void testCloseUnassignedIndex() {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
assertAcked(prepareCreate(indexName)
.setWaitForActiveShards(ActiveShardCount.NONE)
.setSettings(Settings.builder().put("index.routing.allocation.include._name", "nothing").build()));
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN));
assertThat(clusterState.routingTable().allShards().stream().allMatch(ShardRouting::unassigned), is(true));
assertAcked(client().admin().indices().prepareClose(indexName));
assertIndexIsClosed(indexName);
}
public void testConcurrentClose() throws InterruptedException {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
final int nbDocs = randomIntBetween(10, 50);
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs)
.mapToObj(i -> client().prepareIndex(indexName, "_doc", String.valueOf(i)).setSource("num", i)).collect(toList()));
final CountDownLatch startClosing = new CountDownLatch(1);
final Thread[] threads = new Thread[randomIntBetween(2, 5)];
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
try {
startClosing.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
assertAcked(client().admin().indices().prepareClose(indexName));
});
threads[i].start();
}
startClosing.countDown();
for (Thread thread : threads) {
thread.join();
}
assertIndexIsClosed(indexName);
}
public void testCloseWhileIndexingDocuments() throws Exception {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
int nbDocs = 0;
try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client())) {
indexer.setAssertNoFailuresOnStop(false);
waitForDocs(randomIntBetween(10, 50), indexer);
assertAcked(client().admin().indices().prepareClose(indexName));
indexer.stop();
nbDocs += indexer.totalIndexedDocs();
final Throwable[] failures = indexer.getFailures();
if (failures != null) {
for (Throwable failure : failures) {
assertException(failure, indexName);
}
}
}
assertIndexIsClosed(indexName);
assertAcked(client().admin().indices().prepareOpen(indexName));
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs);
}
public void testCloseWhileDeletingIndices() throws Exception {
final String[] indices = new String[randomIntBetween(3, 10)];
for (int i = 0; i < indices.length; i++) {
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
createIndex(indexName);
if (randomBoolean()) {
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, 10)
.mapToObj(n -> client().prepareIndex(indexName, "_doc", String.valueOf(n)).setSource("num", n)).collect(toList()));
}
indices[i] = indexName;
}
assertThat(client().admin().cluster().prepareState().get().getState().metaData().indices().size(), equalTo(indices.length));
final List<Thread> threads = new ArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);
for (final String indexToDelete : indices) {
threads.add(new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
try {
assertAcked(client().admin().indices().prepareDelete(indexToDelete));
} catch (final Exception e) {
assertException(e, indexToDelete);
}
}));
}
for (final String indexToClose : indices) {
threads.add(new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
try {
client().admin().indices().prepareClose(indexToClose).get();
} catch (final Exception e) {
assertException(e, indexToClose);
}
}));
}
for (Thread thread : threads) {
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
}
static void assertIndexIsClosed(final String indexName) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.CLOSE));
assertThat(clusterState.routingTable().index(indexName), nullValue());
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(true));
}
static void assertIndexIsOpened(final String indexName) {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
assertThat(clusterState.metaData().indices().get(indexName).getState(), is(IndexMetaData.State.OPEN));
assertThat(clusterState.routingTable().index(indexName), notNullValue());
assertThat(clusterState.blocks().hasIndexBlock(indexName, MetaDataIndexStateService.INDEX_CLOSED_BLOCK), is(false));
}
static void assertException(final Throwable throwable, final String indexName) {
final Throwable t = ExceptionsHelper.unwrapCause(throwable);
if (t instanceof ClusterBlockException) {
ClusterBlockException clusterBlockException = (ClusterBlockException) t;
assertThat(clusterBlockException.blocks(), hasSize(1));
assertThat(clusterBlockException.blocks(), hasItem(MetaDataIndexStateService.INDEX_CLOSED_BLOCK));
} else if (t instanceof IndexClosedException) {
IndexClosedException indexClosedException = (IndexClosedException) t;
assertThat(indexClosedException.getIndex(), notNullValue());
assertThat(indexClosedException.getIndex().getName(), equalTo(indexName));
} else if (t instanceof IndexNotFoundException) {
IndexNotFoundException indexNotFoundException = (IndexNotFoundException) t;
assertThat(indexNotFoundException.getIndex(), notNullValue());
assertThat(indexNotFoundException.getIndex().getName(), equalTo(indexName));
} else {
fail("Unexpected exception: " + t);
}
}
}

View File

@ -0,0 +1,195 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.indices.state;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING;
import static org.elasticsearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING;
import static org.elasticsearch.indices.state.CloseIndexIT.assertException;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsClosed;
import static org.elasticsearch.indices.state.CloseIndexIT.assertIndexIsOpened;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.greaterThan;
@ESIntegTestCase.ClusterScope(minNumDataNodes = 2)
public class CloseWhileRelocatingShardsIT extends ESIntegTestCase {
@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 10)
.put(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING.getKey(), -1)
.build();
}
@Override
protected int numberOfReplicas() {
return 1;
}
public void testCloseWhileRelocatingShards() throws Exception {
final String[] indices = new String[randomIntBetween(3, 10)];
final Map<String, Long> docsPerIndex = new HashMap<>();
for (int i = 0; i < indices.length; i++) {
final String indexName = "index-" + i;
createIndex(indexName);
int nbDocs = 0;
if (randomBoolean()) {
nbDocs = randomIntBetween(1, 20);
for (int j = 0; j < nbDocs; j++) {
IndexResponse indexResponse = client().prepareIndex(indexName, "_doc").setSource("num", j).get();
assertEquals(RestStatus.CREATED, indexResponse.status());
}
}
docsPerIndex.put(indexName, (long) nbDocs);
indices[i] = indexName;
}
ensureGreen(indices);
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder()
.put(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), EnableAllocationDecider.Rebalance.NONE.toString())));
// start some concurrent indexing threads
final Map<String, BackgroundIndexer> indexers = new HashMap<>();
for (final String index : indices) {
if (randomBoolean()) {
final BackgroundIndexer indexer = new BackgroundIndexer(index, "_doc", client());
waitForDocs(1, indexer);
indexers.put(index, indexer);
}
}
final Set<String> acknowledgedCloses = ConcurrentCollections.newConcurrentSet();
final String newNode = internalCluster().startDataOnlyNode();
try {
final CountDownLatch latch = new CountDownLatch(1);
final List<Thread> threads = new ArrayList<>();
// start shards relocating threads
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
for (final String indexToRelocate : indices) {
final IndexRoutingTable indexRoutingTable = clusterState.routingTable().index(indexToRelocate);
for (int i = 0; i < getNumShards(indexToRelocate).numPrimaries; i++) {
final int shardId = i;
ShardRouting primary = indexRoutingTable.shard(shardId).primaryShard();
assertTrue(primary.started());
ShardRouting replica = indexRoutingTable.shard(shardId).replicaShards().iterator().next();
assertTrue(replica.started());
final String currentNodeId = randomBoolean() ? primary.currentNodeId() : replica.currentNodeId();
assertNotNull(currentNodeId);
final Thread thread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
assertAcked(client().admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(indexToRelocate, shardId, currentNodeId, newNode)));
});
threads.add(thread);
thread.start();
}
}
// start index closing threads
for (final String indexToClose : indices) {
final Thread thread = new Thread(() -> {
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
AcknowledgedResponse closeResponse = client().admin().indices().prepareClose(indexToClose).get();
if (closeResponse.isAcknowledged()) {
assertTrue("Index closing should not be acknowledged twice", acknowledgedCloses.add(indexToClose));
}
});
threads.add(thread);
thread.start();
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
for (Map.Entry<String, BackgroundIndexer> entry : indexers.entrySet()) {
final BackgroundIndexer indexer = entry.getValue();
indexer.setAssertNoFailuresOnStop(false);
indexer.stop();
final String indexName = entry.getKey();
docsPerIndex.computeIfPresent(indexName, (key, value) -> value + indexer.totalIndexedDocs());
final Throwable[] failures = indexer.getFailures();
if (failures != null) {
for (Throwable failure : failures) {
assertException(failure, indexName);
}
}
}
} finally {
assertAcked(client().admin().cluster().prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey())));
}
for (String index : indices) {
if (acknowledgedCloses.contains(index)) {
assertIndexIsClosed(index);
} else {
assertIndexIsOpened(index);
}
}
assertThat("Consider that the test failed if no indices were successfully closed", acknowledgedCloses.size(), greaterThan(0));
assertAcked(client().admin().indices().prepareOpen("index-*"));
ensureGreen(indices);
for (String index : acknowledgedCloses) {
long docsCount = client().prepareSearch(index).setSize(0).get().getHits().getTotalHits().value;
assertEquals("Expected " + docsPerIndex.get(index) + " docs in index " + index + " but got " + docsCount
+ " (close acknowledged=" + acknowledgedCloses.contains(index) + ")", (long) docsPerIndex.get(index), docsCount);
}
}
}

View File

@ -72,13 +72,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
assertIndexIsOpened("test1"); assertIndexIsOpened("test1");
} }
public void testSimpleCloseMissingIndex() {
Client client = client();
Exception e = expectThrows(IndexNotFoundException.class, () ->
client.admin().indices().prepareClose("test1").execute().actionGet());
assertThat(e.getMessage(), is("no such index [test1]"));
}
public void testSimpleOpenMissingIndex() { public void testSimpleOpenMissingIndex() {
Client client = client(); Client client = client();
Exception e = expectThrows(IndexNotFoundException.class, () -> Exception e = expectThrows(IndexNotFoundException.class, () ->
@ -86,27 +79,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
assertThat(e.getMessage(), is("no such index [test1]")); assertThat(e.getMessage(), is("no such index [test1]"));
} }
public void testCloseOneMissingIndex() {
Client client = client();
createIndex("test1");
ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
Exception e = expectThrows(IndexNotFoundException.class, () ->
client.admin().indices().prepareClose("test1", "test2").execute().actionGet());
assertThat(e.getMessage(), is("no such index [test2]"));
}
public void testCloseOneMissingIndexIgnoreMissing() {
Client client = client();
createIndex("test1");
ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
AcknowledgedResponse closeIndexResponse = client.admin().indices().prepareClose("test1", "test2")
.setIndicesOptions(IndicesOptions.lenientExpandOpen()).execute().actionGet();
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
assertIndexIsClosed("test1");
}
public void testOpenOneMissingIndex() { public void testOpenOneMissingIndex() {
Client client = client(); Client client = client();
createIndex("test1"); createIndex("test1");
@ -200,20 +172,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
assertIndexIsOpened("test1", "test2", "test3"); assertIndexIsOpened("test1", "test2", "test3");
} }
public void testCloseNoIndex() {
Client client = client();
Exception e = expectThrows(ActionRequestValidationException.class, () ->
client.admin().indices().prepareClose().execute().actionGet());
assertThat(e.getMessage(), containsString("index is missing"));
}
public void testCloseNullIndex() {
Client client = client();
Exception e = expectThrows(ActionRequestValidationException.class, () ->
client.admin().indices().prepareClose((String[])null).execute().actionGet());
assertThat(e.getMessage(), containsString("index is missing"));
}
public void testOpenNoIndex() { public void testOpenNoIndex() {
Client client = client(); Client client = client();
Exception e = expectThrows(ActionRequestValidationException.class, () -> Exception e = expectThrows(ActionRequestValidationException.class, () ->
@ -241,23 +199,6 @@ public class OpenCloseIndexIT extends ESIntegTestCase {
assertIndexIsOpened("test1"); assertIndexIsOpened("test1");
} }
public void testCloseAlreadyClosedIndex() {
Client client = client();
createIndex("test1");
ClusterHealthResponse healthResponse = client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet();
assertThat(healthResponse.isTimedOut(), equalTo(false));
//closing the index
AcknowledgedResponse closeIndexResponse = client.admin().indices().prepareClose("test1").execute().actionGet();
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
assertIndexIsClosed("test1");
//no problem if we try to close an index that's already in close state
closeIndexResponse = client.admin().indices().prepareClose("test1").execute().actionGet();
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
assertIndexIsClosed("test1");
}
public void testSimpleCloseOpenAlias() { public void testSimpleCloseOpenAlias() {
Client client = client(); Client client = client();
createIndex("test1"); createIndex("test1");

View File

@ -26,7 +26,6 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.ShardRoutingState;
@ -35,6 +34,7 @@ import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.IndexClosedException; import org.elasticsearch.indices.IndexClosedException;
import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.nullValue;
@ -61,8 +61,7 @@ public class SimpleIndexStateIT extends ESIntegTestCase {
client().prepareIndex("test", "type1", "1").setSource("field1", "value1").get(); client().prepareIndex("test", "type1", "1").setSource("field1", "value1").get();
logger.info("--> closing test index..."); logger.info("--> closing test index...");
AcknowledgedResponse closeIndexResponse = client().admin().indices().prepareClose("test").get(); assertAcked(client().admin().indices().prepareClose("test"));
assertThat(closeIndexResponse.isAcknowledged(), equalTo(true));
stateResponse = client().admin().cluster().prepareState().get(); stateResponse = client().admin().cluster().prepareState().get();
assertThat(stateResponse.getState().metaData().index("test").getState(), equalTo(IndexMetaData.State.CLOSE)); assertThat(stateResponse.getState().metaData().index("test").getState(), equalTo(IndexMetaData.State.CLOSE));
@ -103,7 +102,7 @@ public class SimpleIndexStateIT extends ESIntegTestCase {
assertThat(health.isTimedOut(), equalTo(false)); assertThat(health.isTimedOut(), equalTo(false));
assertThat(health.getStatus(), equalTo(ClusterHealthStatus.RED)); assertThat(health.getStatus(), equalTo(ClusterHealthStatus.RED));
client().admin().indices().prepareClose("test").get(); assertAcked(client().admin().indices().prepareClose("test"));
logger.info("--> updating test index settings to allow allocation"); logger.info("--> updating test index settings to allow allocation");
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder() client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder()

View File

@ -594,7 +594,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
equalTo(SnapshotState.PARTIAL)); equalTo(SnapshotState.PARTIAL));
} }
assertAcked(client().admin().indices().prepareClose("test-idx-some", "test-idx-all").execute().actionGet()); assertAcked(client().admin().indices().prepareClose("test-idx-some", "test-idx-all"));
logger.info("--> restore incomplete snapshot - should fail"); logger.info("--> restore incomplete snapshot - should fail");
assertThrows(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false) assertThrows(client().admin().cluster().prepareRestoreSnapshot("test-repo", "test-snap-2").setRestoreGlobalState(false)

View File

@ -20,12 +20,10 @@ package org.elasticsearch.test;/*
import com.carrotsearch.randomizedtesting.RandomizedTest; import com.carrotsearch.randomizedtesting.RandomizedTest;
import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
import com.carrotsearch.randomizedtesting.generators.RandomStrings; import com.carrotsearch.randomizedtesting.generators.RandomStrings;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.bulk.BulkResponse;
@ -63,6 +61,7 @@ public class BackgroundIndexer implements AutoCloseable {
final Semaphore availableBudget = new Semaphore(0); final Semaphore availableBudget = new Semaphore(0);
final boolean useAutoGeneratedIDs; final boolean useAutoGeneratedIDs;
private final Set<String> ids = ConcurrentCollections.newConcurrentSet(); private final Set<String> ids = ConcurrentCollections.newConcurrentSet();
private boolean assertNoFailuresOnStop = true;
volatile int minFieldSize = 10; volatile int minFieldSize = 10;
volatile int maxFieldSize = 140; volatile int maxFieldSize = 140;
@ -163,13 +162,11 @@ public class BackgroundIndexer implements AutoCloseable {
} }
BulkResponse bulkResponse = bulkRequest.get(); BulkResponse bulkResponse = bulkRequest.get();
for (BulkItemResponse bulkItemResponse : bulkResponse) { for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (!bulkItemResponse.isFailed()) { if (bulkItemResponse.isFailed() == false) {
boolean add = ids.add(bulkItemResponse.getId()); boolean add = ids.add(bulkItemResponse.getId());
assert add : "ID: " + bulkItemResponse.getId() + " already used"; assert add : "ID: " + bulkItemResponse.getId() + " already used";
} else { } else {
throw new ElasticsearchException("bulk request failure, id: [" failures.add(bulkItemResponse.getFailure().getCause());
+ bulkItemResponse.getFailure().getId() + "] message: "
+ bulkItemResponse.getFailure().getMessage());
} }
} }
@ -283,8 +280,10 @@ public class BackgroundIndexer implements AutoCloseable {
} }
stop.set(true); stop.set(true);
Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true)); Assert.assertThat("timeout while waiting for indexing threads to stop", stopLatch.await(6, TimeUnit.MINUTES), equalTo(true));
if (assertNoFailuresOnStop) {
assertNoFailures(); assertNoFailures();
} }
}
public long totalIndexedDocs() { public long totalIndexedDocs() {
return ids.size(); return ids.size();
@ -308,6 +307,10 @@ public class BackgroundIndexer implements AutoCloseable {
maxFieldSize = fieldSize; maxFieldSize = fieldSize;
} }
public void setAssertNoFailuresOnStop(final boolean assertNoFailuresOnStop) {
this.assertNoFailuresOnStop = assertNoFailuresOnStop;
}
@Override @Override
public void close() throws Exception { public void close() throws Exception {
stop(); stop();

View File

@ -5,11 +5,14 @@
*/ */
package org.elasticsearch.xpack.core.action; package org.elasticsearch.xpack.core.action;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceNotFoundException; import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.Action; import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest; import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.admin.indices.close.CloseIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.close.TransportCloseIndexAction;
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest; import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse; import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActionFilters;
@ -54,17 +57,20 @@ public final class TransportFreezeIndexAction extends
private final DestructiveOperations destructiveOperations; private final DestructiveOperations destructiveOperations;
private final MetaDataIndexStateService indexStateService; private final MetaDataIndexStateService indexStateService;
private final TransportCloseIndexAction transportCloseIndexAction;
@Inject @Inject
public TransportFreezeIndexAction(MetaDataIndexStateService indexStateService, TransportService transportService, public TransportFreezeIndexAction(MetaDataIndexStateService indexStateService, TransportService transportService,
ClusterService clusterService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, IndexNameExpressionResolver indexNameExpressionResolver,
DestructiveOperations destructiveOperations) { DestructiveOperations destructiveOperations,
TransportCloseIndexAction transportCloseIndexAction) {
super(FreezeIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver, super(FreezeIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, indexNameExpressionResolver,
FreezeRequest::new); FreezeRequest::new);
this.destructiveOperations = destructiveOperations; this.destructiveOperations = destructiveOperations;
this.indexStateService = indexStateService; this.indexStateService = indexStateService;
this.transportCloseIndexAction = transportCloseIndexAction;
} }
@Override @Override
protected String executor() { protected String executor() {
@ -108,6 +114,33 @@ public final class TransportFreezeIndexAction extends
listener.onResponse(new FreezeResponse(true, true)); listener.onResponse(new FreezeResponse(true, true));
return; return;
} }
final CloseIndexClusterStateUpdateRequest closeRequest = new CloseIndexClusterStateUpdateRequest()
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.indices(concreteIndices);
indexStateService.closeIndices(closeRequest, new ActionListener<AcknowledgedResponse>() {
@Override
public void onResponse(final AcknowledgedResponse response) {
if (response.isAcknowledged()) {
toggleFrozenSettings(concreteIndices, request, listener);
} else {
// TODO improve FreezeResponse so that it also reports failures from the close index API
listener.onResponse(new FreezeResponse(false, false));
}
}
@Override
public void onFailure(final Exception t) {
logger.debug(() -> new ParameterizedMessage("failed to close indices [{}]", (Object) concreteIndices), t);
listener.onFailure(t);
}
});
}
private void toggleFrozenSettings(final Index[] concreteIndices, final FreezeRequest request,
final ActionListener<FreezeResponse> listener) {
clusterService.submitStateUpdateTask("toggle-frozen-settings", clusterService.submitStateUpdateTask("toggle-frozen-settings",
new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.URGENT, request, new ActionListener<AcknowledgedResponse>() { new AckedClusterStateUpdateTask<AcknowledgedResponse>(Priority.URGENT, request, new ActionListener<AcknowledgedResponse>() {
@Override @Override
@ -136,14 +169,6 @@ public final class TransportFreezeIndexAction extends
}) { }) {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
List<Index> toClose = new ArrayList<>();
for (Index index : concreteIndices) {
IndexMetaData metaData = currentState.metaData().index(index);
if (metaData.getState() != IndexMetaData.State.CLOSE) {
toClose.add(index);
}
}
currentState = indexStateService.closeIndices(currentState, toClose.toArray(new Index[0]), toClose.toString());
final MetaData.Builder builder = MetaData.builder(currentState.metaData()); final MetaData.Builder builder = MetaData.builder(currentState.metaData());
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks()); ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
for (Index index : concreteIndices) { for (Index index : concreteIndices) {

View File

@ -59,7 +59,7 @@ public final class IndexPrivilege extends Privilege {
GetIndexAction.NAME, IndicesExistsAction.NAME, GetFieldMappingsAction.NAME + "*", GetMappingsAction.NAME, GetIndexAction.NAME, IndicesExistsAction.NAME, GetFieldMappingsAction.NAME + "*", GetMappingsAction.NAME,
ClusterSearchShardsAction.NAME, TypesExistsAction.NAME, ValidateQueryAction.NAME + "*", GetSettingsAction.NAME); ClusterSearchShardsAction.NAME, TypesExistsAction.NAME, ValidateQueryAction.NAME + "*", GetSettingsAction.NAME);
private static final Automaton MANAGE_FOLLOW_INDEX_AUTOMATON = patterns(PutFollowAction.NAME, UnfollowAction.NAME, private static final Automaton MANAGE_FOLLOW_INDEX_AUTOMATON = patterns(PutFollowAction.NAME, UnfollowAction.NAME,
CloseIndexAction.NAME); CloseIndexAction.NAME + "*");
public static final IndexPrivilege NONE = new IndexPrivilege("none", Automatons.EMPTY); public static final IndexPrivilege NONE = new IndexPrivilege("none", Automatons.EMPTY);
public static final IndexPrivilege ALL = new IndexPrivilege("all", ALL_AUTOMATON); public static final IndexPrivilege ALL = new IndexPrivilege("all", ALL_AUTOMATON);