From 10b5ffcda5400acabb74c397d100b0034f5fe33a Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Mon, 1 Feb 2016 11:11:57 +0100 Subject: [PATCH] Add proper handoff between old and new copy of relocating primary shard When primary relocation completes, a cluster state is propagated that deactivates the old primary and marks the new primary as active. As cluster state changes are not applied synchronously on all nodes, there can be a time interval where the relocation target has processed the cluster state and believes to be the active primary and the relocation source has not yet processed the cluster state update and still believes itself to be the active primary. This commit ensures that, before completing the relocation, the reloction source deactivates writing to its store and delegates requests to the relocation target. Closes #15900 --- .../flush/TransportShardFlushAction.java | 2 +- .../refresh/TransportShardRefreshAction.java | 2 +- .../action/index/TransportIndexAction.java | 4 +- .../TransportReplicationAction.java | 177 +++++++++++------- .../action/index/MappingUpdatedAction.java | 6 +- .../elasticsearch/index/shard/IndexShard.java | 97 +++++----- .../shard/IndexShardRelocatedException.java | 8 +- .../cluster/IndicesClusterStateService.java | 8 +- .../indices/flush/SyncedFlushService.java | 2 +- .../indices/recovery/RecoverySource.java | 10 +- .../recovery/RecoverySourceHandler.java | 14 +- .../indices/recovery/RecoveryState.java | 2 +- .../indices/recovery/RecoveryTarget.java | 4 +- .../SharedFSRecoverySourceHandler.java | 4 - .../recovery/StartRecoveryRequest.java | 11 +- .../TransportChannelResponseHandler.java | 16 +- .../ClusterStateCreationUtils.java | 28 +-- .../TransportReplicationActionTests.java | 137 +++++++++++--- .../action/shard/ShardStateActionTests.java | 13 +- .../index/shard/IndexShardTests.java | 137 ++++++++++++-- .../indices/IndicesLifecycleListenerIT.java | 3 +- .../flush/SyncedFlushSingleNodeTests.java | 6 +- .../recovery/IndexPrimaryRelocationIT.java | 89 +++++++++ .../indices/recovery/IndexRecoveryIT.java | 14 +- .../recovery/RecoverySourceHandlerTests.java | 6 +- .../recovery/StartRecoveryRequestTests.java | 5 +- .../recovery/FullRollingRestartIT.java | 4 +- .../test/InternalTestCluster.java | 2 +- 28 files changed, 576 insertions(+), 235 deletions(-) create mode 100644 core/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 8f7fce89c09..302bdafc471 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -58,7 +58,7 @@ public class TransportShardFlushAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); indexShard.flush(shardRequest.getRequest()); logger.trace("{} flush request executed on primary", indexShard.shardId()); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 7c9979e7374..2dd41f7801d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -60,7 +60,7 @@ public class TransportShardRefreshAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, BasicReplicationRequest shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, BasicReplicationRequest shardRequest) { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); indexShard.refresh("api"); logger.trace("{} refresh request executed on primary", indexShard.shardId()); diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 33bf3547d0b..fdd018c51f2 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -140,7 +140,7 @@ public class TransportIndexAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Exception { // validate, if routing is required, that we got routing IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex()); @@ -200,7 +200,7 @@ public class TransportIndexAction extends TransportReplicationAction executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Throwable { + public static WriteResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Exception { Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); final ShardId shardId = indexShard.shardId(); diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index c40d3fb579a..07e4322f6b0 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -56,6 +56,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; @@ -156,10 +157,11 @@ public abstract class TransportReplicationAction shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable; + protected abstract Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception; /** * Replica operation on nodes with replica copies @@ -299,7 +301,7 @@ public abstract class TransportReplicationAction handler = TransportChannelResponseHandler.emptyResponseHandler(logger, channel, extraMessage); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler); } @@ -352,6 +354,7 @@ public abstract class TransportReplicationAction * Note that as soon as we move to replication action, state responsibility is transferred to {@link ReplicationPhase}. */ - final class PrimaryPhase extends AbstractRunnable { + class PrimaryPhase extends AbstractRunnable { private final Request request; + private final ShardId shardId; private final TransportChannel channel; private final ClusterState state; private final AtomicBoolean finished = new AtomicBoolean(); - private Releasable indexShardReference; + private IndexShardReference indexShardReference; PrimaryPhase(Request request, TransportChannel channel) { this.state = clusterService.state(); this.request = request; + assert request.shardId() != null : "request shardId must be set prior to primary phase"; + this.shardId = request.shardId(); this.channel = channel; } @Override public void onFailure(Throwable e) { + if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { + if (logger.isTraceEnabled()) { + logger.trace("failed to execute [{}] on [{}]", e, request, shardId); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("failed to execute [{}] on [{}]", e, request, shardId); + } + } finishAsFailed(e); } @Override protected void doRun() throws Exception { // request shardID was set in ReroutePhase - assert request.shardId() != null : "request shardID must be set prior to primary phase"; - final ShardId shardId = request.shardId(); final String writeConsistencyFailure = checkWriteConsistency(shardId); if (writeConsistencyFailure != null) { finishBecauseUnavailable(shardId, writeConsistencyFailure); return; } - final ReplicationPhase replicationPhase; - try { - indexShardReference = getIndexShardOperationsCounter(shardId); + // closed in finishAsFailed(e) in the case of error + indexShardReference = getIndexShardReferenceOnPrimary(shardId); + if (indexShardReference.isRelocated() == false) { + // execute locally Tuple primaryResponse = shardOperationOnPrimary(state.metaData(), request); if (logger.isTraceEnabled()) { logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version()); } - replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference); - } catch (Throwable e) { - if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { - if (logger.isTraceEnabled()) { - logger.trace("failed to execute [{}] on [{}]", e, request, shardId); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("failed to execute [{}] on [{}]", e, request, shardId); - } - } - finishAsFailed(e); - return; + ReplicationPhase replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference); + finishAndMoveToReplication(replicationPhase); + } else { + // delegate primary phase to relocation target + // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary + // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase. + final ShardRouting primary = indexShardReference.routingEntry(); + indexShardReference.close(); + assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary; + DiscoveryNode relocatingNode = state.nodes().get(primary.relocatingNodeId()); + transportService.sendRequest(relocatingNode, transportPrimaryAction, request, transportOptions, + TransportChannelResponseHandler.responseHandler(logger, TransportReplicationAction.this::newResponseInstance, channel, + "rerouting indexing to target primary " + primary)); } - finishAndMoveToReplication(replicationPhase); } /** @@ -721,10 +736,24 @@ public abstract class TransportReplicationAction readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); + private static final EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); + // for primaries, we only allow to write when actually started (so the cluster has decided we started) + // in case we have a relocation of a primary, we also allow to write after phase 2 completed, where the shard may be + // in state RECOVERING or POST_RECOVERY. After a primary has been marked as RELOCATED, we only allow writes to the relocation target + // which can be either in POST_RECOVERY or already STARTED (this prevents writing concurrently to two primaries). + public static final EnumSet writeAllowedStatesForPrimary = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED); + // replication is also allowed while recovering, since we index also during recovery to replicas and rely on version checks to make sure its consistent + // a relocated shard can also be target of a replication if the relocation target has not been marked as active yet and is syncing it's changes back to the relocation source + private static final EnumSet writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); private final IndexSearcherWrapper searcherWrapper; @@ -250,7 +258,7 @@ public class IndexShard extends AbstractIndexShardComponent { } this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); - this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId); + this.suspendableRefContainer = new SuspendableRefContainer(); this.provider = provider; this.searcherWrapper = indexSearcherWrapper; this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, newQueryShardContext()); @@ -321,6 +329,8 @@ public class IndexShard extends AbstractIndexShardComponent { * Updates the shards routing entry. This mutate the shards internal state depending * on the changes that get introduced by the new routing value. This method will persist shard level metadata * unless explicitly disabled. + * + * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted */ public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) { final ShardRouting currentRouting = this.shardRouting; @@ -368,6 +378,14 @@ public class IndexShard extends AbstractIndexShardComponent { } } } + + if (state == IndexShardState.RELOCATED && + (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) { + // if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery + // failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two + // active primaries. + throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); + } this.shardRouting = newRouting; indexEventListener.shardRoutingChanged(this, currentRouting, newRouting); } finally { @@ -404,12 +422,16 @@ public class IndexShard extends AbstractIndexShardComponent { } public IndexShard relocated(String reason) throws IndexShardNotStartedException { - synchronized (mutex) { - if (state != IndexShardState.STARTED) { - throw new IndexShardNotStartedException(shardId, state); + try (Releasable block = suspendableRefContainer.blockAcquisition()) { + // no shard operation locks are being held here, move state from started to relocated + synchronized (mutex) { + if (state != IndexShardState.STARTED) { + throw new IndexShardNotStartedException(shardId, state); + } + changeState(IndexShardState.RELOCATED, reason); } - changeState(IndexShardState.RELOCATED, reason); } + return this; } @@ -796,7 +818,6 @@ public class IndexShard extends AbstractIndexShardComponent { refreshScheduledFuture = null; } changeState(IndexShardState.CLOSED, reason); - indexShardOperationCounter.decRef(); } finally { final Engine engine = this.currentEngineReference.getAndSet(null); try { @@ -810,7 +831,6 @@ public class IndexShard extends AbstractIndexShardComponent { } } - public IndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) { refresh("percolator_load_queries"); @@ -967,16 +987,17 @@ public class IndexShard extends AbstractIndexShardComponent { IndexShardState state = this.state; // one time volatile read if (origin == Engine.Operation.Origin.PRIMARY) { - // for primaries, we only allow to write when actually started (so the cluster has decided we started) - // otherwise, we need to retry, we also want to still allow to index if we are relocated in case it fails - if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) { - throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering, origin [" + origin + "]"); + if (writeAllowedStatesForPrimary.contains(state) == false) { + throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStatesForPrimary + ", origin [" + origin + "]"); + } + } else if (origin == Engine.Operation.Origin.RECOVERY) { + if (state != IndexShardState.RECOVERING) { + throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when recovering, origin [" + origin + "]"); } } else { - // for replicas, we allow to write also while recovering, since we index also during recovery to replicas - // and rely on version checks to make sure its consistent - if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) { - throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering, origin [" + origin + "]"); + assert origin == Engine.Operation.Origin.REPLICA; + if (writeAllowedStatesForReplica.contains(state) == false) { + throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStatesForReplica + ", origin [" + origin + "]"); } } } @@ -995,7 +1016,7 @@ public class IndexShard extends AbstractIndexShardComponent { private void verifyNotClosed(Throwable suppressed) throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read if (state == IndexShardState.CLOSED) { - final IllegalIndexShardStateException exc = new IllegalIndexShardStateException(shardId, state, "operation only allowed when not closed"); + final IllegalIndexShardStateException exc = new IndexShardClosedException(shardId, "operation only allowed when not closed"); if (suppressed != null) { exc.addSuppressed(suppressed); } @@ -1390,37 +1411,21 @@ public class IndexShard extends AbstractIndexShardComponent { idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME)); } - private static class IndexShardOperationCounter extends AbstractRefCounted { - final private ESLogger logger; - private final ShardId shardId; - - public IndexShardOperationCounter(ESLogger logger, ShardId shardId) { - super("index-shard-operations-counter"); - this.logger = logger; - this.shardId = shardId; - } - - @Override - protected void closeInternal() { - logger.debug("operations counter reached 0, will not accept any further writes"); - } - - @Override - protected void alreadyClosed() { - throw new IndexShardClosedException(shardId, "could not increment operation counter. shard is closed."); + public Releasable acquirePrimaryOperationLock() { + verifyNotClosed(); + if (shardRouting.primary() == false) { + throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary"); } + return suspendableRefContainer.acquireUninterruptibly(); } - public void incrementOperationCounter() { - indexShardOperationCounter.incRef(); + public Releasable acquireReplicaOperationLock() { + verifyNotClosed(); + return suspendableRefContainer.acquireUninterruptibly(); } - public void decrementOperationCounter() { - indexShardOperationCounter.decRef(); - } - - public int getOperationsCount() { - return Math.max(0, indexShardOperationCounter.refCount() - 1); // refCount is incremented on creation and decremented on close + public int getActiveOperationsCount() { + return suspendableRefContainer.activeRefs(); // refCount is incremented on creation and decremented on close } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardRelocatedException.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardRelocatedException.java index 2d3c48cd4c5..043ad892777 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardRelocatedException.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardRelocatedException.java @@ -29,10 +29,14 @@ import java.io.IOException; public class IndexShardRelocatedException extends IllegalIndexShardStateException { public IndexShardRelocatedException(ShardId shardId) { - super(shardId, IndexShardState.RELOCATED, "Already relocated"); + this(shardId, "Already relocated"); + } + + public IndexShardRelocatedException(ShardId shardId, String reason) { + super(shardId, IndexShardState.RELOCATED, reason); } public IndexShardRelocatedException(StreamInput in) throws IOException{ super(in); } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 8c2f23f7081..7fc12eb8bab 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -492,7 +492,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent> ongoingRecoveries = new HashMap<>(); synchronized void add(IndexShard shard, RecoverySourceHandler handler) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 8cbdfca0221..26c288cfbca 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -393,9 +393,11 @@ public class RecoverySourceHandler { } }); - - if (request.markAsRelocated()) { - // TODO what happens if the recovery process fails afterwards, we need to mark this back to started + if (isPrimaryRelocation()) { + /** + * if the recovery process fails after setting the shard state to RELOCATED, both relocation source and + * target are failed (see {@link IndexShard#updateRoutingEntry}). + */ try { shard.relocated("to " + request.targetNode()); } catch (IllegalIndexShardStateException e) { @@ -406,7 +408,11 @@ public class RecoverySourceHandler { } stopWatch.stop(); logger.trace("[{}][{}] finalizing recovery to {}: took [{}]", - indexName, shardId, request.targetNode(), stopWatch.totalTime()); + indexName, shardId, request.targetNode(), stopWatch.totalTime()); + } + + protected boolean isPrimaryRelocation() { + return request.recoveryType() == RecoveryState.Type.PRIMARY_RELOCATION; } /** diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 92bfc87218a..d1c41d4b932 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -101,7 +101,7 @@ public class RecoveryState implements ToXContent, Streamable { STORE((byte) 0), SNAPSHOT((byte) 1), REPLICA((byte) 2), - RELOCATION((byte) 3); + PRIMARY_RELOCATION((byte) 3); private static final Type[] TYPES = new Type[Type.values().length]; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 0912a22a0f5..727bd0b6441 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -138,7 +138,6 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe // create a new recovery status, and process... final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout()); threadPool.generic().execute(new RecoveryRunner(recoveryId)); - } protected void retryRecovery(final RecoveryStatus recoveryStatus, final Throwable reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) { @@ -178,7 +177,7 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe return; } final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(), - false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId()); + metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId()); final AtomicReference responseHolder = new AtomicReference<>(); try { @@ -267,7 +266,6 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source shard is closed", cause), false); return; } - onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, e), true); } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index 16bd1d46553..8d75c474791 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -84,8 +84,4 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { return 0; } - private boolean isPrimaryRelocation() { - return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary(); - } - } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java index 3a62f4f6352..49dd70a73f7 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java @@ -41,8 +41,6 @@ public class StartRecoveryRequest extends TransportRequest { private DiscoveryNode targetNode; - private boolean markAsRelocated; - private Store.MetadataSnapshot metadataSnapshot; private RecoveryState.Type recoveryType; @@ -56,12 +54,11 @@ public class StartRecoveryRequest extends TransportRequest { * @param sourceNode The node to recover from * @param targetNode The node to recover to */ - public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Store.MetadataSnapshot metadataSnapshot, RecoveryState.Type recoveryType, long recoveryId) { + public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, Store.MetadataSnapshot metadataSnapshot, RecoveryState.Type recoveryType, long recoveryId) { this.recoveryId = recoveryId; this.shardId = shardId; this.sourceNode = sourceNode; this.targetNode = targetNode; - this.markAsRelocated = markAsRelocated; this.recoveryType = recoveryType; this.metadataSnapshot = metadataSnapshot; } @@ -82,10 +79,6 @@ public class StartRecoveryRequest extends TransportRequest { return targetNode; } - public boolean markAsRelocated() { - return markAsRelocated; - } - public RecoveryState.Type recoveryType() { return recoveryType; } @@ -101,7 +94,6 @@ public class StartRecoveryRequest extends TransportRequest { shardId = ShardId.readShardId(in); sourceNode = DiscoveryNode.readNode(in); targetNode = DiscoveryNode.readNode(in); - markAsRelocated = in.readBoolean(); metadataSnapshot = new Store.MetadataSnapshot(in); recoveryType = RecoveryState.Type.fromId(in.readByte()); @@ -114,7 +106,6 @@ public class StartRecoveryRequest extends TransportRequest { shardId.writeTo(out); sourceNode.writeTo(out); targetNode.writeTo(out); - out.writeBoolean(markAsRelocated); metadataSnapshot.writeTo(out); out.writeByte(recoveryType.id()); } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java b/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java index 8c042cd1937..69fc73e4af0 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.function.Supplier; /** * Base class for delegating transport response to a transport channel @@ -30,7 +31,7 @@ import java.io.IOException; public abstract class TransportChannelResponseHandler implements TransportResponseHandler { /** - * Convenience method for delegating an empty response to the provided changed + * Convenience method for delegating an empty response to the provided transport channel */ public static TransportChannelResponseHandler emptyResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) { return new TransportChannelResponseHandler(logger, channel, extraInfoOnError) { @@ -41,6 +42,19 @@ public abstract class TransportChannelResponseHandler TransportChannelResponseHandler responseHandler(ESLogger logger, Supplier responseSupplier, TransportChannel channel, String extraInfoOnError) { + return new TransportChannelResponseHandler(logger, channel, extraInfoOnError) { + @Override + public T newInstance() { + return responseSupplier.get(); + } + }; + } + + private final ESLogger logger; private final TransportChannel channel; private final String extraInfoOnError; diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 49a5e072e1f..8e7b70a3b21 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -56,12 +56,12 @@ public class ClusterStateCreationUtils { /** * Creates cluster state with and index that has one shard and #(replicaStates) replicas * - * @param index name of the index - * @param primaryLocal if primary should coincide with the local node in the cluster state - * @param primaryState state of primary - * @param replicaStates states of the replicas. length of this array determines also the number of replicas + * @param index name of the index + * @param activePrimaryLocal if active primary should coincide with the local node in the cluster state + * @param primaryState state of primary + * @param replicaStates states of the replicas. length of this array determines also the number of replicas */ - public static ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) { + public static ClusterState state(String index, boolean activePrimaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) { final int numberOfReplicas = replicaStates.length; int numberOfNodes = numberOfReplicas + 1; @@ -97,7 +97,7 @@ public class ClusterStateCreationUtils { String relocatingNode = null; UnassignedInfo unassignedInfo = null; if (primaryState != ShardRoutingState.UNASSIGNED) { - if (primaryLocal) { + if (activePrimaryLocal) { primaryNode = newNode(0).id(); unassignedNodes.remove(primaryNode); } else { @@ -173,13 +173,13 @@ public class ClusterStateCreationUtils { * Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas. * Primary will be STARTED in cluster state but replicas will be one of UNASSIGNED, INITIALIZING, STARTED or RELOCATING. * - * @param index name of the index - * @param primaryLocal if primary should coincide with the local node in the cluster state - * @param numberOfReplicas number of replicas + * @param index name of the index + * @param activePrimaryLocal if active primary should coincide with the local node in the cluster state + * @param numberOfReplicas number of replicas */ - public static ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) { + public static ClusterState stateWithActivePrimary(String index, boolean activePrimaryLocal, int numberOfReplicas) { int assignedReplicas = randomIntBetween(0, numberOfReplicas); - return stateWithStartedPrimary(index, primaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas); + return stateWithActivePrimary(index, activePrimaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas); } /** @@ -188,11 +188,11 @@ public class ClusterStateCreationUtils { * some (assignedReplicas) will be one of INITIALIZING, STARTED or RELOCATING. * * @param index name of the index - * @param primaryLocal if primary should coincide with the local node in the cluster state + * @param activePrimaryLocal if active primary should coincide with the local node in the cluster state * @param assignedReplicas number of replicas that should have INITIALIZING, STARTED or RELOCATING state * @param unassignedReplicas number of replicas that should be unassigned */ - public static ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int assignedReplicas, int unassignedReplicas) { + public static ClusterState stateWithActivePrimary(String index, boolean activePrimaryLocal, int assignedReplicas, int unassignedReplicas) { ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; // no point in randomizing - node assignment later on does it too. for (int i = 0; i < assignedReplicas; i++) { @@ -201,7 +201,7 @@ public class ClusterStateCreationUtils { for (int i = assignedReplicas; i < replicaStates.length; i++) { replicaStates[i] = ShardRoutingState.UNASSIGNED; } - return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates); + return state(index, activePrimaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates); } /** diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 402a454649b..4542be25485 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; @@ -75,9 +76,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; -import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.empty; @@ -225,7 +227,7 @@ public class TransportReplicationActionTests extends ESTestCase { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - clusterService.setState(stateWithStartedPrimary(index, randomBoolean(), 3)); + clusterService.setState(stateWithActivePrimary(index, randomBoolean(), 3)); logger.debug("using state: \n{}", clusterService.state().prettyPrint()); @@ -249,33 +251,73 @@ public class TransportReplicationActionTests extends ESTestCase { assertIndexShardUninitialized(); } - public void testPrimaryPhaseExecutesRequest() throws InterruptedException, ExecutionException { + public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throws InterruptedException, ExecutionException { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - clusterService.setState(state(index, true, ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); + clusterService.setState(state); Request request = new Request(shardId).timeout("1ms"); PlainActionFuture listener = new PlainActionFuture<>(); - TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)); + AtomicBoolean movedToReplication = new AtomicBoolean(); + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)) { + @Override + void finishAndMoveToReplication(TransportReplicationAction.ReplicationPhase replicationPhase) { + super.finishAndMoveToReplication(replicationPhase); + movedToReplication.set(true); + } + }; + ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + boolean executeOnPrimary = true; + if (primaryShard.relocating() && randomBoolean()) { // whether shard has been marked as relocated already (i.e. relocation completed) + isRelocated.set(true); + indexShardRouting.set(primaryShard); + executeOnPrimary = false; + } primaryPhase.run(); - assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); - final String replicaNodeId = clusterService.state().getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards().get(0).currentNodeId(); - final List requests = transport.getCapturedRequestsByTargetNodeAndClear().get(replicaNodeId); - assertThat(requests, notNullValue()); - assertThat(requests.size(), equalTo(1)); - assertThat("replica request was not sent", requests.get(0).action, equalTo("testAction[r]")); + assertThat(request.processedOnPrimary.get(), equalTo(executeOnPrimary)); + assertThat(movedToReplication.get(), equalTo(executeOnPrimary)); + if (executeOnPrimary == false) { + final List requests = transport.capturedRequestsByTargetNode().get(primaryShard.relocatingNodeId()); + assertThat(requests, notNullValue()); + assertThat(requests.size(), equalTo(1)); + assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("testAction[p]")); + } + } + + public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws InterruptedException, ExecutionException { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + ClusterState state = state(index, true, ShardRoutingState.RELOCATING); + String primaryTargetNodeId = state.getRoutingTable().shardRoutingTable(shardId).primaryShard().relocatingNodeId(); + // simulate execution of the primary phase on the relocation target node + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryTargetNodeId)).build(); + clusterService.setState(state); + Request request = new Request(shardId).timeout("1ms"); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean movedToReplication = new AtomicBoolean(); + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)) { + @Override + void finishAndMoveToReplication(TransportReplicationAction.ReplicationPhase replicationPhase) { + super.finishAndMoveToReplication(replicationPhase); + movedToReplication.set(true); + } + }; + primaryPhase.run(); + assertThat("request was not processed on primary relocation target", request.processedOnPrimary.get(), equalTo(true)); + assertThat(movedToReplication.get(), equalTo(true)); } public void testAddedReplicaAfterPrimaryOperation() { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); // start with no replicas - clusterService.setState(stateWithStartedPrimary(index, true, 0)); + clusterService.setState(stateWithActivePrimary(index, true, 0)); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); final ClusterState stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED); final Action actionWithAddedReplicaAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { final Tuple operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); // add replicas after primary operation ((TestClusterService) clusterService).setState(stateWithAddedReplicas); @@ -302,13 +344,13 @@ public class TransportReplicationActionTests extends ESTestCase { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); // start with a replica - clusterService.setState(state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED)); + clusterService.setState(state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED)); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); final ClusterState stateWithRelocatingReplica = state(index, true, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); final Action actionWithRelocatingReplicasAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { final Tuple operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); // set replica to relocating ((TestClusterService) clusterService).setState(stateWithRelocatingReplica); @@ -341,7 +383,7 @@ public class TransportReplicationActionTests extends ESTestCase { final Action actionWithDeletedIndexAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { final Tuple operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); // delete index after primary op ((TestClusterService) clusterService).setState(stateWithDeletedIndex); @@ -432,7 +474,13 @@ public class TransportReplicationActionTests extends ESTestCase { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); + ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + if (primaryShard.relocating() && randomBoolean()) { + // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); + } + clusterService.setState(state); final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); int assignedReplicas = 0; @@ -455,12 +503,19 @@ public class TransportReplicationActionTests extends ESTestCase { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - ClusterState state = stateWithStartedPrimary(index, true, randomInt(5)); + ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); MetaData.Builder metaData = MetaData.builder(state.metaData()); Settings.Builder settings = Settings.builder().put(metaData.get(index).getSettings()); settings.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true); metaData.put(IndexMetaData.builder(metaData.get(index)).settings(settings)); - clusterService.setState(ClusterState.builder(state).metaData(metaData)); + state = ClusterState.builder(state).metaData(metaData).build(); + + ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + if (primaryShard.relocating() && randomBoolean()) { + // simulate execution of the primary phase on the relocation target node + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); + } + clusterService.setState(state); final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); int assignedReplicas = 0; @@ -507,8 +562,9 @@ public class TransportReplicationActionTests extends ESTestCase { assertEquals(request.shardId, replicationRequest.shardId); } + String localNodeId = clusterService.state().getNodes().localNodeId(); // no request was sent to the local node - assertThat(nodesSentTo.keySet(), not(hasItem(clusterService.state().getNodes().localNodeId()))); + assertThat(nodesSentTo.keySet(), not(hasItem(localNodeId))); // requests were sent to the correct shard copies for (ShardRouting shard : clusterService.state().getRoutingTable().shardRoutingTable(shardId)) { @@ -518,11 +574,11 @@ public class TransportReplicationActionTests extends ESTestCase { if (shard.unassigned()) { continue; } - if (shard.primary() == false) { - nodesSentTo.remove(shard.currentNodeId()); + if (localNodeId.equals(shard.currentNodeId()) == false) { + assertThat(nodesSentTo.remove(shard.currentNodeId()), notNullValue()); } - if (shard.relocating()) { - nodesSentTo.remove(shard.relocatingNodeId()); + if (shard.relocating() && localNodeId.equals(shard.relocatingNodeId()) == false) { // for relocating primaries, we replicate from target to source if source is marked as relocated + assertThat(nodesSentTo.remove(shard.relocatingNodeId()), notNullValue()); } } @@ -629,6 +685,7 @@ public class TransportReplicationActionTests extends ESTestCase { // shard operation should be ongoing, so the counter is at 2 // we have to wait here because increment happens in thread assertBusy(() -> assertIndexShardCounter(2)); + assertThat(transport.capturedRequests().length, equalTo(0)); ((ActionWithDelay) action).countDownLatch.countDown(); t.join(); @@ -726,12 +783,28 @@ public class TransportReplicationActionTests extends ESTestCase { private final AtomicInteger count = new AtomicInteger(0); + private final AtomicBoolean isRelocated = new AtomicBoolean(false); + + private final AtomicReference indexShardRouting = new AtomicReference<>(); + /* * Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run. * */ - private synchronized Releasable getOrCreateIndexShardOperationsCounter() { + private synchronized TransportReplicationAction.IndexShardReference getOrCreateIndexShardOperationsCounter() { count.incrementAndGet(); - return new Releasable() { + return new TransportReplicationAction.IndexShardReference() { + @Override + public boolean isRelocated() { + return isRelocated.get(); + } + + @Override + public ShardRouting routingEntry() { + ShardRouting shardRouting = indexShardRouting.get(); + assert shardRouting != null; + return shardRouting; + } + @Override public void close() { count.decrementAndGet(); @@ -783,7 +856,7 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; return new Tuple<>(new Response(), shardRequest); @@ -805,7 +878,11 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected Releasable getIndexShardOperationsCounter(ShardId shardId) { + protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { + return getOrCreateIndexShardOperationsCounter(); + } + + protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) { return getOrCreateIndexShardOperationsCounter(); } } @@ -832,7 +909,7 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) { return throwException(shardRequest.shardId()); } @@ -870,7 +947,7 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { awaitLatch(); return new Tuple<>(new Response(), shardRequest); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 8a13e6e6ddd..b6e69b27a5a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.action.shard; import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -33,7 +34,6 @@ import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; @@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; -import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -127,7 +126,7 @@ public class ShardStateActionTests extends ESTestCase { public void testSuccess() throws InterruptedException { final String index = "test"; - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); @@ -169,7 +168,7 @@ public class ShardStateActionTests extends ESTestCase { public void testNoMaster() throws InterruptedException { final String index = "test"; - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); DiscoveryNodes.Builder noMasterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); noMasterBuilder.masterNodeId(null); @@ -207,7 +206,7 @@ public class ShardStateActionTests extends ESTestCase { public void testMasterChannelException() throws InterruptedException { final String index = "test"; - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); @@ -264,7 +263,7 @@ public class ShardStateActionTests extends ESTestCase { public void testUnhandledFailure() { final String index = "test"; - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); @@ -294,7 +293,7 @@ public class ShardStateActionTests extends ESTestCase { public void testShardNotFound() throws InterruptedException { final String index = "test"; - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9a4e6a814a3..5c93bbb7d5d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -57,6 +57,8 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; @@ -108,6 +110,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -125,6 +128,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; /** * Simple unit-test IndexShard related operations. @@ -316,36 +320,41 @@ public class IndexShardTests extends ESSingleNodeTestCase { } - public void testDeleteIndexDecreasesCounter() throws InterruptedException, ExecutionException, IOException { + public void testDeleteIndexPreventsNewOperations() throws InterruptedException, ExecutionException, IOException { assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService indexService = indicesService.indexServiceSafe("test"); IndexShard indexShard = indexService.getShardOrNull(0); client().admin().indices().prepareDelete("test").get(); - assertThat(indexShard.getOperationsCount(), equalTo(0)); + assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); try { - indexShard.incrementOperationCounter(); + indexShard.acquirePrimaryOperationLock(); + fail("we should not be able to increment anymore"); + } catch (IndexShardClosedException e) { + // expected + } + try { + indexShard.acquireReplicaOperationLock(); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } } - public void testIndexShardCounter() throws InterruptedException, ExecutionException, IOException { + public void testIndexOperationsCounter() throws InterruptedException, ExecutionException, IOException { assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService indexService = indicesService.indexServiceSafe("test"); IndexShard indexShard = indexService.getShardOrNull(0); - assertEquals(0, indexShard.getOperationsCount()); - indexShard.incrementOperationCounter(); - assertEquals(1, indexShard.getOperationsCount()); - indexShard.incrementOperationCounter(); - assertEquals(2, indexShard.getOperationsCount()); - indexShard.decrementOperationCounter(); - indexShard.decrementOperationCounter(); - assertEquals(0, indexShard.getOperationsCount()); + assertEquals(0, indexShard.getActiveOperationsCount()); + Releasable operation1 = indexShard.acquirePrimaryOperationLock(); + assertEquals(1, indexShard.getActiveOperationsCount()); + Releasable operation2 = indexShard.acquirePrimaryOperationLock(); + assertEquals(2, indexShard.getActiveOperationsCount()); + Releasables.close(operation1, operation2); + assertEquals(0, indexShard.getActiveOperationsCount()); } public void testMarkAsInactiveTriggersSyncedFlush() throws Exception { @@ -777,6 +786,89 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(total + 1, shard.flushStats().getTotal()); } + public void testLockingBeforeAndAfterRelocated() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test").setSettings( + Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + ).get()); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + final IndexShard shard = test.getShardOrNull(0); + CountDownLatch latch = new CountDownLatch(1); + Thread recoveryThread = new Thread(() -> { + latch.countDown(); + shard.relocated("simulated recovery"); + }); + + try (Releasable ignored = shard.acquirePrimaryOperationLock()) { + // start finalization of recovery + recoveryThread.start(); + latch.await(); + // recovery can only be finalized after we release the current primaryOperationLock + assertThat(shard.state(), equalTo(IndexShardState.STARTED)); + } + // recovery can be now finalized + recoveryThread.join(); + assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); + try (Releasable ignored = shard.acquirePrimaryOperationLock()) { + // lock can again be acquired + assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); + } + } + + public void testStressRelocated() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test").setSettings( + Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + ).get()); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + final IndexShard shard = test.getShardOrNull(0); + final int numThreads = randomIntBetween(2, 4); + Thread[] indexThreads = new Thread[numThreads]; + CountDownLatch somePrimaryOperationLockAcquired = new CountDownLatch(1); + CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + for (int i = 0; i < indexThreads.length; i++) { + indexThreads[i] = new Thread() { + @Override + public void run() { + try (Releasable operationLock = shard.acquirePrimaryOperationLock()) { + somePrimaryOperationLockAcquired.countDown(); + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + } + }; + indexThreads[i].start(); + } + AtomicBoolean relocated = new AtomicBoolean(); + final Thread recoveryThread = new Thread(() -> { + shard.relocated("simulated recovery"); + relocated.set(true); + }); + // ensure we wait for at least one primary operation lock to be acquired + somePrimaryOperationLockAcquired.await(); + // start recovery thread + recoveryThread.start(); + assertThat(relocated.get(), equalTo(false)); + assertThat(shard.getActiveOperationsCount(), greaterThan(0)); + // ensure we only transition to RELOCATED state after pending operations completed + assertThat(shard.state(), equalTo(IndexShardState.STARTED)); + // complete pending operations + barrier.await(); + // complete recovery/relocation + recoveryThread.join(); + // ensure relocated successfully once pending operations are done + assertThat(relocated.get(), equalTo(true)); + assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); + assertThat(shard.getActiveOperationsCount(), equalTo(0)); + + for (Thread indexThread : indexThreads) { + indexThread.join(); + } + } + public void testRecoverFromStore() throws IOException { createIndex("test"); ensureGreen(); @@ -857,6 +949,27 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertHitCount(client().prepareSearch().get(), 1); } + public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + final IndexShard shard = test.getShardOrNull(0); + ShardRouting origRouting = shard.routingEntry(); + assertThat(shard.state(), equalTo(IndexShardState.STARTED)); + ShardRouting inRecoveryRouting = new ShardRouting(origRouting); + ShardRoutingHelper.relocate(inRecoveryRouting, "some_node"); + shard.updateRoutingEntry(inRecoveryRouting, true); + shard.relocated("simulate mark as relocated"); + assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); + ShardRouting failedRecoveryRouting = new ShardRouting(origRouting); + try { + shard.updateRoutingEntry(failedRecoveryRouting, true); + fail("Expected IndexShardRelocatedException"); + } catch (IndexShardRelocatedException expected) { + } + } + public void testRestoreShard() throws IOException { createIndex("test"); createIndex("test_target"); diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java index f1f8a8222cb..b074729cdff 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java @@ -58,6 +58,7 @@ import static org.elasticsearch.index.shard.IndexShardState.CLOSED; import static org.elasticsearch.index.shard.IndexShardState.CREATED; import static org.elasticsearch.index.shard.IndexShardState.POST_RECOVERY; import static org.elasticsearch.index.shard.IndexShardState.RECOVERING; +import static org.elasticsearch.index.shard.IndexShardState.RELOCATED; import static org.elasticsearch.index.shard.IndexShardState.STARTED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.CoreMatchers.equalTo; @@ -181,7 +182,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase { ensureGreen(); //the 3 relocated shards get closed on the first node - assertShardStatesMatch(stateChangeListenerNode1, 3, CLOSED); + assertShardStatesMatch(stateChangeListenerNode1, 3, RELOCATED, CLOSED); //the 3 relocated shards get created on the second node assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED); diff --git a/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index c30a5adaaca..239cb7a9096 100644 --- a/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; @@ -110,8 +111,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); - shard.incrementOperationCounter(); - try { + try (Releasable operationLock = shard.acquirePrimaryOperationLock()) { SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.attemptSyncedFlush(shardId, listener); listener.latch.await(); @@ -121,8 +121,6 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { assertEquals(0, syncedFlushResult.successfulShards()); assertNotEquals(0, syncedFlushResult.totalShards()); assertEquals("[1] ongoing operations on primary", syncedFlushResult.failureReason()); - } finally { - shard.decrementOperationCounter(); } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java new file mode 100644 index 00000000000..727641eb224 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.equalTo; + +@TestLogging("_root:DEBUG") +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class IndexPrimaryRelocationIT extends ESIntegTestCase { + + private static final int RELOCATION_COUNT = 25; + + public void testPrimaryRelocationWhileIndexing() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3)); + client().admin().indices().prepareCreate("test") + .setSettings(Settings.settingsBuilder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)) + .addMapping("type", "field", "type=string") + .get(); + ensureGreen("test"); + + final AtomicBoolean finished = new AtomicBoolean(false); + Thread indexingThread = new Thread() { + @Override + public void run() { + while (finished.get() == false) { + IndexResponse indexResponse = client().prepareIndex("test", "type", "id").setSource("field", "value").get(); + assertThat("deleted document was found", indexResponse.isCreated(), equalTo(true)); + DeleteResponse deleteResponse = client().prepareDelete("test", "type", "id").get(); + assertThat("indexed document was not found", deleteResponse.isFound(), equalTo(true)); + } + } + }; + indexingThread.start(); + + ClusterState initialState = client().admin().cluster().prepareState().get().getState(); + DiscoveryNode[] dataNodes = initialState.getNodes().dataNodes().values().toArray(DiscoveryNode.class); + DiscoveryNode relocationSource = initialState.getNodes().dataNodes().get(initialState.getRoutingTable().shardRoutingTable("test", 0).primaryShard().currentNodeId()); + for (int i = 0; i < RELOCATION_COUNT; i++) { + DiscoveryNode relocationTarget = randomFrom(dataNodes); + while (relocationTarget.equals(relocationSource)) { + relocationTarget = randomFrom(dataNodes); + } + logger.info("--> [iteration {}] relocating from {} to {} ", i, relocationSource.getName(), relocationTarget.getName()); + client().admin().cluster().prepareReroute() + .add(new MoveAllocationCommand("test", 0, relocationSource.getId(), relocationTarget.getId())) + .execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).execute().actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + logger.info("--> [iteration {}] relocation complete", i); + relocationSource = relocationTarget; + if (indexingThread.isAlive() == false) { // indexing process aborted early, no need for more relocations as test has already failed + break; + } + + } + finished.set(true); + indexingThread.join(); + } +} diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 12acea4f9ac..cc11cb82057 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -286,7 +286,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - assertOnGoingRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, nodeA, nodeB, false); + assertOnGoingRecoveryState(nodeBRecoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, nodeA, nodeB, false); validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); logger.info("--> request node recovery stats"); @@ -339,7 +339,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); assertThat(recoveryStates.size(), equalTo(1)); - assertRecoveryState(recoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + assertRecoveryState(recoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, Stage.DONE, nodeA, nodeB, false); validateIndexRecoveryState(recoveryStates.get(0).getIndex()); statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); @@ -400,7 +400,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.REPLICA, Stage.DONE, nodeB, nodeA, false); validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, Stage.DONE, nodeA, nodeB, false); validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); // relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B) @@ -421,7 +421,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); assertThat(nodeCRecoveryStates.size(), equalTo(1)); - assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, Stage.DONE, nodeA, nodeB, false); validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); // relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B) @@ -503,7 +503,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) { - docs[i] = client().prepareIndex(INDEX_NAME, INDEX_TYPE). + docs[i] = client().prepareIndex(name, INDEX_TYPE). setSource("foo-int", randomInt(), "foo-string", randomAsciiOfLength(32), "foo-float", randomFloat()); @@ -511,8 +511,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { indexRandom(true, docs); flush(); - assertThat(client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().totalHits(), equalTo((long) numDocs)); - return client().admin().indices().prepareStats(INDEX_NAME).execute().actionGet(); + assertThat(client().prepareSearch(name).setSize(0).get().getHits().totalHits(), equalTo((long) numDocs)); + return client().admin().indices().prepareStats(name).execute().actionGet(); } private void validateIndexRecoveryState(RecoveryState.Index indexState) { diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index c8cad5be296..b29404d59b6 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -69,7 +69,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { StartRecoveryRequest request = new StartRecoveryRequest(shardId, new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), - randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); + null, RecoveryState.Type.STORE, randomLong()); Store store = newStore(createTempDir()); RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger); Directory dir = store.directory(); @@ -118,7 +118,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { StartRecoveryRequest request = new StartRecoveryRequest(shardId, new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), - randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); + null, RecoveryState.Type.STORE, randomLong()); Path tempDir = createTempDir(); Store store = newStore(tempDir, false); AtomicBoolean failedEngine = new AtomicBoolean(false); @@ -181,7 +181,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { StartRecoveryRequest request = new StartRecoveryRequest(shardId, new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), - randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); + null, RecoveryState.Type.STORE, randomLong()); Path tempDir = createTempDir(); Store store = newStore(tempDir, false); AtomicBoolean failedEngine = new AtomicBoolean(false); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java index c7a7852e426..3406388bd5b 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java @@ -43,11 +43,9 @@ public class StartRecoveryRequestTests extends ESTestCase { new ShardId("test", "_na_", 0), new DiscoveryNode("a", new LocalTransportAddress("1"), targetNodeVersion), new DiscoveryNode("b", new LocalTransportAddress("1"), targetNodeVersion), - true, Store.MetadataSnapshot.EMPTY, - RecoveryState.Type.RELOCATION, + RecoveryState.Type.PRIMARY_RELOCATION, 1L - ); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); @@ -63,7 +61,6 @@ public class StartRecoveryRequestTests extends ESTestCase { assertThat(outRequest.shardId(), equalTo(inRequest.shardId())); assertThat(outRequest.sourceNode(), equalTo(inRequest.sourceNode())); assertThat(outRequest.targetNode(), equalTo(inRequest.targetNode())); - assertThat(outRequest.markAsRelocated(), equalTo(inRequest.markAsRelocated())); assertThat(outRequest.metadataSnapshot().asMap(), equalTo(inRequest.metadataSnapshot().asMap())); assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId())); assertThat(outRequest.recoveryType(), equalTo(inRequest.recoveryType())); diff --git a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java index 61dca3f37af..8c6b71c9eac 100644 --- a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java @@ -151,7 +151,7 @@ public class FullRollingRestartIT extends ESIntegTestCase { ClusterState state = client().admin().cluster().prepareState().get().getState(); RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) { - assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode() + "\n" + state.prettyPrint(), recoveryState.getType() != RecoveryState.Type.RELOCATION); + assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode() + "\n" + state.prettyPrint(), recoveryState.getType() != RecoveryState.Type.PRIMARY_RELOCATION); } internalCluster().restartRandomDataNode(); ensureGreen(); @@ -159,7 +159,7 @@ public class FullRollingRestartIT extends ESIntegTestCase { recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) { - assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode()+ "-- \nbefore: \n" + state.prettyPrint() + "\nafter: \n" + afterState.prettyPrint(), recoveryState.getType() != RecoveryState.Type.RELOCATION); + assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode()+ "-- \nbefore: \n" + state.prettyPrint() + "\nafter: \n" + afterState.prettyPrint(), recoveryState.getType() != RecoveryState.Type.PRIMARY_RELOCATION); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 658264864e0..d9f634d503e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1036,7 +1036,7 @@ public final class InternalTestCluster extends TestCluster { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getOperationsCount(), equalTo(0)); + assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getActiveOperationsCount(), equalTo(0)); } } }