diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 8f7fce89c09..302bdafc471 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -58,7 +58,7 @@ public class TransportShardFlushAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, ShardFlushRequest shardRequest) { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); indexShard.flush(shardRequest.getRequest()); logger.trace("{} flush request executed on primary", indexShard.shardId()); diff --git a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 7c9979e7374..2dd41f7801d 100644 --- a/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/core/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -60,7 +60,7 @@ public class TransportShardRefreshAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, BasicReplicationRequest shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, BasicReplicationRequest shardRequest) { IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.shardId().getIndex()).getShard(shardRequest.shardId().id()); indexShard.refresh("api"); logger.trace("{} refresh request executed on primary", indexShard.shardId()); diff --git a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index 33bf3547d0b..fdd018c51f2 100644 --- a/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/core/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -140,7 +140,7 @@ public class TransportIndexAction extends TransportReplicationAction shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, IndexRequest request) throws Exception { // validate, if routing is required, that we got routing IndexMetaData indexMetaData = metaData.index(request.shardId().getIndex()); @@ -200,7 +200,7 @@ public class TransportIndexAction extends TransportReplicationAction executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Throwable { + public static WriteResult executeIndexRequestOnPrimary(IndexRequest request, IndexShard indexShard, MappingUpdatedAction mappingUpdatedAction) throws Exception { Engine.Index operation = prepareIndexOperationOnPrimary(request, indexShard); Mapping update = operation.parsedDoc().dynamicMappingsUpdate(); final ShardId shardId = indexShard.shardId(); diff --git a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index c40d3fb579a..07e4322f6b0 100644 --- a/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/core/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -56,6 +56,7 @@ import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.shard.IndexShard; +import org.elasticsearch.index.shard.IndexShardState; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.indices.IndicesService; @@ -156,10 +157,11 @@ public abstract class TransportReplicationAction shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable; + protected abstract Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception; /** * Replica operation on nodes with replica copies @@ -299,7 +301,7 @@ public abstract class TransportReplicationAction handler = TransportChannelResponseHandler.emptyResponseHandler(logger, channel, extraMessage); transportService.sendRequest(clusterService.localNode(), transportReplicaAction, request, handler); } @@ -352,6 +354,7 @@ public abstract class TransportReplicationAction * Note that as soon as we move to replication action, state responsibility is transferred to {@link ReplicationPhase}. */ - final class PrimaryPhase extends AbstractRunnable { + class PrimaryPhase extends AbstractRunnable { private final Request request; + private final ShardId shardId; private final TransportChannel channel; private final ClusterState state; private final AtomicBoolean finished = new AtomicBoolean(); - private Releasable indexShardReference; + private IndexShardReference indexShardReference; PrimaryPhase(Request request, TransportChannel channel) { this.state = clusterService.state(); this.request = request; + assert request.shardId() != null : "request shardId must be set prior to primary phase"; + this.shardId = request.shardId(); this.channel = channel; } @Override public void onFailure(Throwable e) { + if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { + if (logger.isTraceEnabled()) { + logger.trace("failed to execute [{}] on [{}]", e, request, shardId); + } + } else { + if (logger.isDebugEnabled()) { + logger.debug("failed to execute [{}] on [{}]", e, request, shardId); + } + } finishAsFailed(e); } @Override protected void doRun() throws Exception { // request shardID was set in ReroutePhase - assert request.shardId() != null : "request shardID must be set prior to primary phase"; - final ShardId shardId = request.shardId(); final String writeConsistencyFailure = checkWriteConsistency(shardId); if (writeConsistencyFailure != null) { finishBecauseUnavailable(shardId, writeConsistencyFailure); return; } - final ReplicationPhase replicationPhase; - try { - indexShardReference = getIndexShardOperationsCounter(shardId); + // closed in finishAsFailed(e) in the case of error + indexShardReference = getIndexShardReferenceOnPrimary(shardId); + if (indexShardReference.isRelocated() == false) { + // execute locally Tuple primaryResponse = shardOperationOnPrimary(state.metaData(), request); if (logger.isTraceEnabled()) { logger.trace("action [{}] completed on shard [{}] for request [{}] with cluster state version [{}]", transportPrimaryAction, shardId, request, state.version()); } - replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference); - } catch (Throwable e) { - if (ExceptionsHelper.status(e) == RestStatus.CONFLICT) { - if (logger.isTraceEnabled()) { - logger.trace("failed to execute [{}] on [{}]", e, request, shardId); - } - } else { - if (logger.isDebugEnabled()) { - logger.debug("failed to execute [{}] on [{}]", e, request, shardId); - } - } - finishAsFailed(e); - return; + ReplicationPhase replicationPhase = new ReplicationPhase(primaryResponse.v2(), primaryResponse.v1(), shardId, channel, indexShardReference); + finishAndMoveToReplication(replicationPhase); + } else { + // delegate primary phase to relocation target + // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary + // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase. + final ShardRouting primary = indexShardReference.routingEntry(); + indexShardReference.close(); + assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary; + DiscoveryNode relocatingNode = state.nodes().get(primary.relocatingNodeId()); + transportService.sendRequest(relocatingNode, transportPrimaryAction, request, transportOptions, + TransportChannelResponseHandler.responseHandler(logger, TransportReplicationAction.this::newResponseInstance, channel, + "rerouting indexing to target primary " + primary)); } - finishAndMoveToReplication(replicationPhase); } /** @@ -721,10 +736,24 @@ public abstract class TransportReplicationAction readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); + private static final EnumSet readAllowedStates = EnumSet.of(IndexShardState.STARTED, IndexShardState.RELOCATED, IndexShardState.POST_RECOVERY); + // for primaries, we only allow to write when actually started (so the cluster has decided we started) + // in case we have a relocation of a primary, we also allow to write after phase 2 completed, where the shard may be + // in state RECOVERING or POST_RECOVERY. After a primary has been marked as RELOCATED, we only allow writes to the relocation target + // which can be either in POST_RECOVERY or already STARTED (this prevents writing concurrently to two primaries). + public static final EnumSet writeAllowedStatesForPrimary = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED); + // replication is also allowed while recovering, since we index also during recovery to replicas and rely on version checks to make sure its consistent + // a relocated shard can also be target of a replication if the relocation target has not been marked as active yet and is syncing it's changes back to the relocation source + private static final EnumSet writeAllowedStatesForReplica = EnumSet.of(IndexShardState.RECOVERING, IndexShardState.POST_RECOVERY, IndexShardState.STARTED, IndexShardState.RELOCATED); private final IndexSearcherWrapper searcherWrapper; @@ -250,7 +258,7 @@ public class IndexShard extends AbstractIndexShardComponent { } this.engineConfig = newEngineConfig(translogConfig, cachingPolicy); - this.indexShardOperationCounter = new IndexShardOperationCounter(logger, shardId); + this.suspendableRefContainer = new SuspendableRefContainer(); this.provider = provider; this.searcherWrapper = indexSearcherWrapper; this.percolatorQueriesRegistry = new PercolatorQueriesRegistry(shardId, indexSettings, newQueryShardContext()); @@ -321,6 +329,8 @@ public class IndexShard extends AbstractIndexShardComponent { * Updates the shards routing entry. This mutate the shards internal state depending * on the changes that get introduced by the new routing value. This method will persist shard level metadata * unless explicitly disabled. + * + * @throws IndexShardRelocatedException if shard is marked as relocated and relocation aborted */ public void updateRoutingEntry(final ShardRouting newRouting, final boolean persistState) { final ShardRouting currentRouting = this.shardRouting; @@ -368,6 +378,14 @@ public class IndexShard extends AbstractIndexShardComponent { } } } + + if (state == IndexShardState.RELOCATED && + (newRouting.relocating() == false || newRouting.equalsIgnoringMetaData(currentRouting) == false)) { + // if the shard is marked as RELOCATED we have to fail when any changes in shard routing occur (e.g. due to recovery + // failure / cancellation). The reason is that at the moment we cannot safely move back to STARTED without risking two + // active primaries. + throw new IndexShardRelocatedException(shardId(), "Shard is marked as relocated, cannot safely move to state " + newRouting.state()); + } this.shardRouting = newRouting; indexEventListener.shardRoutingChanged(this, currentRouting, newRouting); } finally { @@ -404,12 +422,16 @@ public class IndexShard extends AbstractIndexShardComponent { } public IndexShard relocated(String reason) throws IndexShardNotStartedException { - synchronized (mutex) { - if (state != IndexShardState.STARTED) { - throw new IndexShardNotStartedException(shardId, state); + try (Releasable block = suspendableRefContainer.blockAcquisition()) { + // no shard operation locks are being held here, move state from started to relocated + synchronized (mutex) { + if (state != IndexShardState.STARTED) { + throw new IndexShardNotStartedException(shardId, state); + } + changeState(IndexShardState.RELOCATED, reason); } - changeState(IndexShardState.RELOCATED, reason); } + return this; } @@ -796,7 +818,6 @@ public class IndexShard extends AbstractIndexShardComponent { refreshScheduledFuture = null; } changeState(IndexShardState.CLOSED, reason); - indexShardOperationCounter.decRef(); } finally { final Engine engine = this.currentEngineReference.getAndSet(null); try { @@ -810,7 +831,6 @@ public class IndexShard extends AbstractIndexShardComponent { } } - public IndexShard postRecovery(String reason) throws IndexShardStartedException, IndexShardRelocatedException, IndexShardClosedException { if (mapperService.hasMapping(PercolatorService.TYPE_NAME)) { refresh("percolator_load_queries"); @@ -967,16 +987,17 @@ public class IndexShard extends AbstractIndexShardComponent { IndexShardState state = this.state; // one time volatile read if (origin == Engine.Operation.Origin.PRIMARY) { - // for primaries, we only allow to write when actually started (so the cluster has decided we started) - // otherwise, we need to retry, we also want to still allow to index if we are relocated in case it fails - if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED) { - throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering, origin [" + origin + "]"); + if (writeAllowedStatesForPrimary.contains(state) == false) { + throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStatesForPrimary + ", origin [" + origin + "]"); + } + } else if (origin == Engine.Operation.Origin.RECOVERY) { + if (state != IndexShardState.RECOVERING) { + throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when recovering, origin [" + origin + "]"); } } else { - // for replicas, we allow to write also while recovering, since we index also during recovery to replicas - // and rely on version checks to make sure its consistent - if (state != IndexShardState.STARTED && state != IndexShardState.RELOCATED && state != IndexShardState.RECOVERING && state != IndexShardState.POST_RECOVERY) { - throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when started/recovering, origin [" + origin + "]"); + assert origin == Engine.Operation.Origin.REPLICA; + if (writeAllowedStatesForReplica.contains(state) == false) { + throw new IllegalIndexShardStateException(shardId, state, "operation only allowed when shard state is one of " + writeAllowedStatesForReplica + ", origin [" + origin + "]"); } } } @@ -995,7 +1016,7 @@ public class IndexShard extends AbstractIndexShardComponent { private void verifyNotClosed(Throwable suppressed) throws IllegalIndexShardStateException { IndexShardState state = this.state; // one time volatile read if (state == IndexShardState.CLOSED) { - final IllegalIndexShardStateException exc = new IllegalIndexShardStateException(shardId, state, "operation only allowed when not closed"); + final IllegalIndexShardStateException exc = new IndexShardClosedException(shardId, "operation only allowed when not closed"); if (suppressed != null) { exc.addSuppressed(suppressed); } @@ -1390,37 +1411,21 @@ public class IndexShard extends AbstractIndexShardComponent { idxSettings.getSettings().getAsTime(IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_DEFAULT_INACTIVE_TIME)); } - private static class IndexShardOperationCounter extends AbstractRefCounted { - final private ESLogger logger; - private final ShardId shardId; - - public IndexShardOperationCounter(ESLogger logger, ShardId shardId) { - super("index-shard-operations-counter"); - this.logger = logger; - this.shardId = shardId; - } - - @Override - protected void closeInternal() { - logger.debug("operations counter reached 0, will not accept any further writes"); - } - - @Override - protected void alreadyClosed() { - throw new IndexShardClosedException(shardId, "could not increment operation counter. shard is closed."); + public Releasable acquirePrimaryOperationLock() { + verifyNotClosed(); + if (shardRouting.primary() == false) { + throw new IllegalIndexShardStateException(shardId, state, "shard is not a primary"); } + return suspendableRefContainer.acquireUninterruptibly(); } - public void incrementOperationCounter() { - indexShardOperationCounter.incRef(); + public Releasable acquireReplicaOperationLock() { + verifyNotClosed(); + return suspendableRefContainer.acquireUninterruptibly(); } - public void decrementOperationCounter() { - indexShardOperationCounter.decRef(); - } - - public int getOperationsCount() { - return Math.max(0, indexShardOperationCounter.refCount() - 1); // refCount is incremented on creation and decremented on close + public int getActiveOperationsCount() { + return suspendableRefContainer.activeRefs(); // refCount is incremented on creation and decremented on close } /** diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShardRelocatedException.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShardRelocatedException.java index 2d3c48cd4c5..043ad892777 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShardRelocatedException.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShardRelocatedException.java @@ -29,10 +29,14 @@ import java.io.IOException; public class IndexShardRelocatedException extends IllegalIndexShardStateException { public IndexShardRelocatedException(ShardId shardId) { - super(shardId, IndexShardState.RELOCATED, "Already relocated"); + this(shardId, "Already relocated"); + } + + public IndexShardRelocatedException(ShardId shardId, String reason) { + super(shardId, IndexShardState.RELOCATED, reason); } public IndexShardRelocatedException(StreamInput in) throws IOException{ super(in); } -} \ No newline at end of file +} diff --git a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 8c2f23f7081..7fc12eb8bab 100644 --- a/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/core/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -492,7 +492,11 @@ public class IndicesClusterStateService extends AbstractLifecycleComponent> ongoingRecoveries = new HashMap<>(); synchronized void add(IndexShard shard, RecoverySourceHandler handler) { diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 8cbdfca0221..26c288cfbca 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -393,9 +393,11 @@ public class RecoverySourceHandler { } }); - - if (request.markAsRelocated()) { - // TODO what happens if the recovery process fails afterwards, we need to mark this back to started + if (isPrimaryRelocation()) { + /** + * if the recovery process fails after setting the shard state to RELOCATED, both relocation source and + * target are failed (see {@link IndexShard#updateRoutingEntry}). + */ try { shard.relocated("to " + request.targetNode()); } catch (IllegalIndexShardStateException e) { @@ -406,7 +408,11 @@ public class RecoverySourceHandler { } stopWatch.stop(); logger.trace("[{}][{}] finalizing recovery to {}: took [{}]", - indexName, shardId, request.targetNode(), stopWatch.totalTime()); + indexName, shardId, request.targetNode(), stopWatch.totalTime()); + } + + protected boolean isPrimaryRelocation() { + return request.recoveryType() == RecoveryState.Type.PRIMARY_RELOCATION; } /** diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 92bfc87218a..d1c41d4b932 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -101,7 +101,7 @@ public class RecoveryState implements ToXContent, Streamable { STORE((byte) 0), SNAPSHOT((byte) 1), REPLICA((byte) 2), - RELOCATION((byte) 3); + PRIMARY_RELOCATION((byte) 3); private static final Type[] TYPES = new Type[Type.values().length]; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 0912a22a0f5..727bd0b6441 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -138,7 +138,6 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe // create a new recovery status, and process... final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout()); threadPool.generic().execute(new RecoveryRunner(recoveryId)); - } protected void retryRecovery(final RecoveryStatus recoveryStatus, final Throwable reason, TimeValue retryAfter, final StartRecoveryRequest currentRequest) { @@ -178,7 +177,7 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe return; } final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(), - false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId()); + metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId()); final AtomicReference responseHolder = new AtomicReference<>(); try { @@ -267,7 +266,6 @@ public class RecoveryTarget extends AbstractComponent implements IndexEventListe onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, "source shard is closed", cause), false); return; } - onGoingRecoveries.failRecovery(recoveryStatus.recoveryId(), new RecoveryFailedException(request, e), true); } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java index 16bd1d46553..8d75c474791 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/SharedFSRecoverySourceHandler.java @@ -84,8 +84,4 @@ public class SharedFSRecoverySourceHandler extends RecoverySourceHandler { return 0; } - private boolean isPrimaryRelocation() { - return request.recoveryType() == RecoveryState.Type.RELOCATION && shard.routingEntry().primary(); - } - } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java b/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java index 3a62f4f6352..49dd70a73f7 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/StartRecoveryRequest.java @@ -41,8 +41,6 @@ public class StartRecoveryRequest extends TransportRequest { private DiscoveryNode targetNode; - private boolean markAsRelocated; - private Store.MetadataSnapshot metadataSnapshot; private RecoveryState.Type recoveryType; @@ -56,12 +54,11 @@ public class StartRecoveryRequest extends TransportRequest { * @param sourceNode The node to recover from * @param targetNode The node to recover to */ - public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, boolean markAsRelocated, Store.MetadataSnapshot metadataSnapshot, RecoveryState.Type recoveryType, long recoveryId) { + public StartRecoveryRequest(ShardId shardId, DiscoveryNode sourceNode, DiscoveryNode targetNode, Store.MetadataSnapshot metadataSnapshot, RecoveryState.Type recoveryType, long recoveryId) { this.recoveryId = recoveryId; this.shardId = shardId; this.sourceNode = sourceNode; this.targetNode = targetNode; - this.markAsRelocated = markAsRelocated; this.recoveryType = recoveryType; this.metadataSnapshot = metadataSnapshot; } @@ -82,10 +79,6 @@ public class StartRecoveryRequest extends TransportRequest { return targetNode; } - public boolean markAsRelocated() { - return markAsRelocated; - } - public RecoveryState.Type recoveryType() { return recoveryType; } @@ -101,7 +94,6 @@ public class StartRecoveryRequest extends TransportRequest { shardId = ShardId.readShardId(in); sourceNode = DiscoveryNode.readNode(in); targetNode = DiscoveryNode.readNode(in); - markAsRelocated = in.readBoolean(); metadataSnapshot = new Store.MetadataSnapshot(in); recoveryType = RecoveryState.Type.fromId(in.readByte()); @@ -114,7 +106,6 @@ public class StartRecoveryRequest extends TransportRequest { shardId.writeTo(out); sourceNode.writeTo(out); targetNode.writeTo(out); - out.writeBoolean(markAsRelocated); metadataSnapshot.writeTo(out); out.writeByte(recoveryType.id()); } diff --git a/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java b/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java index 8c042cd1937..69fc73e4af0 100644 --- a/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java +++ b/core/src/main/java/org/elasticsearch/transport/TransportChannelResponseHandler.java @@ -23,6 +23,7 @@ import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; +import java.util.function.Supplier; /** * Base class for delegating transport response to a transport channel @@ -30,7 +31,7 @@ import java.io.IOException; public abstract class TransportChannelResponseHandler implements TransportResponseHandler { /** - * Convenience method for delegating an empty response to the provided changed + * Convenience method for delegating an empty response to the provided transport channel */ public static TransportChannelResponseHandler emptyResponseHandler(ESLogger logger, TransportChannel channel, String extraInfoOnError) { return new TransportChannelResponseHandler(logger, channel, extraInfoOnError) { @@ -41,6 +42,19 @@ public abstract class TransportChannelResponseHandler TransportChannelResponseHandler responseHandler(ESLogger logger, Supplier responseSupplier, TransportChannel channel, String extraInfoOnError) { + return new TransportChannelResponseHandler(logger, channel, extraInfoOnError) { + @Override + public T newInstance() { + return responseSupplier.get(); + } + }; + } + + private final ESLogger logger; private final TransportChannel channel; private final String extraInfoOnError; diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java index 49a5e072e1f..8e7b70a3b21 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/ClusterStateCreationUtils.java @@ -56,12 +56,12 @@ public class ClusterStateCreationUtils { /** * Creates cluster state with and index that has one shard and #(replicaStates) replicas * - * @param index name of the index - * @param primaryLocal if primary should coincide with the local node in the cluster state - * @param primaryState state of primary - * @param replicaStates states of the replicas. length of this array determines also the number of replicas + * @param index name of the index + * @param activePrimaryLocal if active primary should coincide with the local node in the cluster state + * @param primaryState state of primary + * @param replicaStates states of the replicas. length of this array determines also the number of replicas */ - public static ClusterState state(String index, boolean primaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) { + public static ClusterState state(String index, boolean activePrimaryLocal, ShardRoutingState primaryState, ShardRoutingState... replicaStates) { final int numberOfReplicas = replicaStates.length; int numberOfNodes = numberOfReplicas + 1; @@ -97,7 +97,7 @@ public class ClusterStateCreationUtils { String relocatingNode = null; UnassignedInfo unassignedInfo = null; if (primaryState != ShardRoutingState.UNASSIGNED) { - if (primaryLocal) { + if (activePrimaryLocal) { primaryNode = newNode(0).id(); unassignedNodes.remove(primaryNode); } else { @@ -173,13 +173,13 @@ public class ClusterStateCreationUtils { * Creates cluster state with and index that has one shard and as many replicas as numberOfReplicas. * Primary will be STARTED in cluster state but replicas will be one of UNASSIGNED, INITIALIZING, STARTED or RELOCATING. * - * @param index name of the index - * @param primaryLocal if primary should coincide with the local node in the cluster state - * @param numberOfReplicas number of replicas + * @param index name of the index + * @param activePrimaryLocal if active primary should coincide with the local node in the cluster state + * @param numberOfReplicas number of replicas */ - public static ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int numberOfReplicas) { + public static ClusterState stateWithActivePrimary(String index, boolean activePrimaryLocal, int numberOfReplicas) { int assignedReplicas = randomIntBetween(0, numberOfReplicas); - return stateWithStartedPrimary(index, primaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas); + return stateWithActivePrimary(index, activePrimaryLocal, assignedReplicas, numberOfReplicas - assignedReplicas); } /** @@ -188,11 +188,11 @@ public class ClusterStateCreationUtils { * some (assignedReplicas) will be one of INITIALIZING, STARTED or RELOCATING. * * @param index name of the index - * @param primaryLocal if primary should coincide with the local node in the cluster state + * @param activePrimaryLocal if active primary should coincide with the local node in the cluster state * @param assignedReplicas number of replicas that should have INITIALIZING, STARTED or RELOCATING state * @param unassignedReplicas number of replicas that should be unassigned */ - public static ClusterState stateWithStartedPrimary(String index, boolean primaryLocal, int assignedReplicas, int unassignedReplicas) { + public static ClusterState stateWithActivePrimary(String index, boolean activePrimaryLocal, int assignedReplicas, int unassignedReplicas) { ShardRoutingState[] replicaStates = new ShardRoutingState[assignedReplicas + unassignedReplicas]; // no point in randomizing - node assignment later on does it too. for (int i = 0; i < assignedReplicas; i++) { @@ -201,7 +201,7 @@ public class ClusterStateCreationUtils { for (int i = assignedReplicas; i < replicaStates.length; i++) { replicaStates[i] = ShardRoutingState.UNASSIGNED; } - return state(index, primaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates); + return state(index, activePrimaryLocal, randomFrom(ShardRoutingState.STARTED, ShardRoutingState.RELOCATING), replicaStates); } /** diff --git a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index 402a454649b..4542be25485 100644 --- a/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/core/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; @@ -75,9 +76,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; -import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; +import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithActivePrimary; import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.empty; @@ -225,7 +227,7 @@ public class TransportReplicationActionTests extends ESTestCase { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - clusterService.setState(stateWithStartedPrimary(index, randomBoolean(), 3)); + clusterService.setState(stateWithActivePrimary(index, randomBoolean(), 3)); logger.debug("using state: \n{}", clusterService.state().prettyPrint()); @@ -249,33 +251,73 @@ public class TransportReplicationActionTests extends ESTestCase { assertIndexShardUninitialized(); } - public void testPrimaryPhaseExecutesRequest() throws InterruptedException, ExecutionException { + public void testPrimaryPhaseExecutesOrDelegatesRequestToRelocationTarget() throws InterruptedException, ExecutionException { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - clusterService.setState(state(index, true, ShardRoutingState.STARTED, ShardRoutingState.STARTED)); + ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); + clusterService.setState(state); Request request = new Request(shardId).timeout("1ms"); PlainActionFuture listener = new PlainActionFuture<>(); - TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)); + AtomicBoolean movedToReplication = new AtomicBoolean(); + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)) { + @Override + void finishAndMoveToReplication(TransportReplicationAction.ReplicationPhase replicationPhase) { + super.finishAndMoveToReplication(replicationPhase); + movedToReplication.set(true); + } + }; + ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + boolean executeOnPrimary = true; + if (primaryShard.relocating() && randomBoolean()) { // whether shard has been marked as relocated already (i.e. relocation completed) + isRelocated.set(true); + indexShardRouting.set(primaryShard); + executeOnPrimary = false; + } primaryPhase.run(); - assertThat("request was not processed on primary", request.processedOnPrimary.get(), equalTo(true)); - final String replicaNodeId = clusterService.state().getRoutingTable().shardRoutingTable(index, shardId.id()).replicaShards().get(0).currentNodeId(); - final List requests = transport.getCapturedRequestsByTargetNodeAndClear().get(replicaNodeId); - assertThat(requests, notNullValue()); - assertThat(requests.size(), equalTo(1)); - assertThat("replica request was not sent", requests.get(0).action, equalTo("testAction[r]")); + assertThat(request.processedOnPrimary.get(), equalTo(executeOnPrimary)); + assertThat(movedToReplication.get(), equalTo(executeOnPrimary)); + if (executeOnPrimary == false) { + final List requests = transport.capturedRequestsByTargetNode().get(primaryShard.relocatingNodeId()); + assertThat(requests, notNullValue()); + assertThat(requests.size(), equalTo(1)); + assertThat("primary request was not delegated to relocation target", requests.get(0).action, equalTo("testAction[p]")); + } + } + + public void testPrimaryPhaseExecutesDelegatedRequestOnRelocationTarget() throws InterruptedException, ExecutionException { + final String index = "test"; + final ShardId shardId = new ShardId(index, "_na_", 0); + ClusterState state = state(index, true, ShardRoutingState.RELOCATING); + String primaryTargetNodeId = state.getRoutingTable().shardRoutingTable(shardId).primaryShard().relocatingNodeId(); + // simulate execution of the primary phase on the relocation target node + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryTargetNodeId)).build(); + clusterService.setState(state); + Request request = new Request(shardId).timeout("1ms"); + PlainActionFuture listener = new PlainActionFuture<>(); + AtomicBoolean movedToReplication = new AtomicBoolean(); + TransportReplicationAction.PrimaryPhase primaryPhase = action.new PrimaryPhase(request, createTransportChannel(listener)) { + @Override + void finishAndMoveToReplication(TransportReplicationAction.ReplicationPhase replicationPhase) { + super.finishAndMoveToReplication(replicationPhase); + movedToReplication.set(true); + } + }; + primaryPhase.run(); + assertThat("request was not processed on primary relocation target", request.processedOnPrimary.get(), equalTo(true)); + assertThat(movedToReplication.get(), equalTo(true)); } public void testAddedReplicaAfterPrimaryOperation() { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); // start with no replicas - clusterService.setState(stateWithStartedPrimary(index, true, 0)); + clusterService.setState(stateWithActivePrimary(index, true, 0)); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); final ClusterState stateWithAddedReplicas = state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED); final Action actionWithAddedReplicaAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { final Tuple operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); // add replicas after primary operation ((TestClusterService) clusterService).setState(stateWithAddedReplicas); @@ -302,13 +344,13 @@ public class TransportReplicationActionTests extends ESTestCase { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); // start with a replica - clusterService.setState(state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED)); + clusterService.setState(state(index, true, ShardRoutingState.STARTED, randomBoolean() ? ShardRoutingState.INITIALIZING : ShardRoutingState.STARTED)); logger.debug("--> using initial state:\n{}", clusterService.state().prettyPrint()); final ClusterState stateWithRelocatingReplica = state(index, true, ShardRoutingState.STARTED, ShardRoutingState.RELOCATING); final Action actionWithRelocatingReplicasAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { final Tuple operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); // set replica to relocating ((TestClusterService) clusterService).setState(stateWithRelocatingReplica); @@ -341,7 +383,7 @@ public class TransportReplicationActionTests extends ESTestCase { final Action actionWithDeletedIndexAfterPrimaryOp = new Action(Settings.EMPTY, "testAction", transportService, clusterService, threadPool) { @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { final Tuple operationOnPrimary = super.shardOperationOnPrimary(metaData, shardRequest); // delete index after primary op ((TestClusterService) clusterService).setState(stateWithDeletedIndex); @@ -432,7 +474,13 @@ public class TransportReplicationActionTests extends ESTestCase { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); + ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + if (primaryShard.relocating() && randomBoolean()) { + // simulate execution of the replication phase on the relocation target node after relocation source was marked as relocated + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); + } + clusterService.setState(state); final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); int assignedReplicas = 0; @@ -455,12 +503,19 @@ public class TransportReplicationActionTests extends ESTestCase { final String index = "test"; final ShardId shardId = new ShardId(index, "_na_", 0); - ClusterState state = stateWithStartedPrimary(index, true, randomInt(5)); + ClusterState state = stateWithActivePrimary(index, true, randomInt(5)); MetaData.Builder metaData = MetaData.builder(state.metaData()); Settings.Builder settings = Settings.builder().put(metaData.get(index).getSettings()); settings.put(IndexMetaData.SETTING_SHADOW_REPLICAS, true); metaData.put(IndexMetaData.builder(metaData.get(index)).settings(settings)); - clusterService.setState(ClusterState.builder(state).metaData(metaData)); + state = ClusterState.builder(state).metaData(metaData).build(); + + ShardRouting primaryShard = state.getRoutingTable().shardRoutingTable(shardId).primaryShard(); + if (primaryShard.relocating() && randomBoolean()) { + // simulate execution of the primary phase on the relocation target node + state = ClusterState.builder(state).nodes(DiscoveryNodes.builder(state.nodes()).localNodeId(primaryShard.relocatingNodeId())).build(); + } + clusterService.setState(state); final IndexShardRoutingTable shardRoutingTable = clusterService.state().routingTable().index(index).shard(shardId.id()); int assignedReplicas = 0; @@ -507,8 +562,9 @@ public class TransportReplicationActionTests extends ESTestCase { assertEquals(request.shardId, replicationRequest.shardId); } + String localNodeId = clusterService.state().getNodes().localNodeId(); // no request was sent to the local node - assertThat(nodesSentTo.keySet(), not(hasItem(clusterService.state().getNodes().localNodeId()))); + assertThat(nodesSentTo.keySet(), not(hasItem(localNodeId))); // requests were sent to the correct shard copies for (ShardRouting shard : clusterService.state().getRoutingTable().shardRoutingTable(shardId)) { @@ -518,11 +574,11 @@ public class TransportReplicationActionTests extends ESTestCase { if (shard.unassigned()) { continue; } - if (shard.primary() == false) { - nodesSentTo.remove(shard.currentNodeId()); + if (localNodeId.equals(shard.currentNodeId()) == false) { + assertThat(nodesSentTo.remove(shard.currentNodeId()), notNullValue()); } - if (shard.relocating()) { - nodesSentTo.remove(shard.relocatingNodeId()); + if (shard.relocating() && localNodeId.equals(shard.relocatingNodeId()) == false) { // for relocating primaries, we replicate from target to source if source is marked as relocated + assertThat(nodesSentTo.remove(shard.relocatingNodeId()), notNullValue()); } } @@ -629,6 +685,7 @@ public class TransportReplicationActionTests extends ESTestCase { // shard operation should be ongoing, so the counter is at 2 // we have to wait here because increment happens in thread assertBusy(() -> assertIndexShardCounter(2)); + assertThat(transport.capturedRequests().length, equalTo(0)); ((ActionWithDelay) action).countDownLatch.countDown(); t.join(); @@ -726,12 +783,28 @@ public class TransportReplicationActionTests extends ESTestCase { private final AtomicInteger count = new AtomicInteger(0); + private final AtomicBoolean isRelocated = new AtomicBoolean(false); + + private final AtomicReference indexShardRouting = new AtomicReference<>(); + /* * Returns testIndexShardOperationsCounter or initializes it if it was already created in this test run. * */ - private synchronized Releasable getOrCreateIndexShardOperationsCounter() { + private synchronized TransportReplicationAction.IndexShardReference getOrCreateIndexShardOperationsCounter() { count.incrementAndGet(); - return new Releasable() { + return new TransportReplicationAction.IndexShardReference() { + @Override + public boolean isRelocated() { + return isRelocated.get(); + } + + @Override + public ShardRouting routingEntry() { + ShardRouting shardRouting = indexShardRouting.get(); + assert shardRouting != null; + return shardRouting; + } + @Override public void close() { count.decrementAndGet(); @@ -783,7 +856,7 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; return new Tuple<>(new Response(), shardRequest); @@ -805,7 +878,11 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected Releasable getIndexShardOperationsCounter(ShardId shardId) { + protected IndexShardReference getIndexShardReferenceOnPrimary(ShardId shardId) { + return getOrCreateIndexShardOperationsCounter(); + } + + protected IndexShardReference getIndexShardReferenceOnReplica(ShardId shardId) { return getOrCreateIndexShardOperationsCounter(); } } @@ -832,7 +909,7 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) { return throwException(shardRequest.shardId()); } @@ -870,7 +947,7 @@ public class TransportReplicationActionTests extends ESTestCase { } @Override - protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Throwable { + protected Tuple shardOperationOnPrimary(MetaData metaData, Request shardRequest) throws Exception { awaitLatch(); return new Tuple<>(new Response(), shardRequest); } diff --git a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java index 8a13e6e6ddd..b6e69b27a5a 100644 --- a/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java +++ b/core/src/test/java/org/elasticsearch/cluster/action/shard/ShardStateActionTests.java @@ -20,6 +20,7 @@ package org.elasticsearch.cluster.action.shard; import org.apache.lucene.index.CorruptIndexException; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateObserver; @@ -33,7 +34,6 @@ import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; -import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.cluster.TestClusterService; import org.elasticsearch.test.transport.CapturingTransport; @@ -55,7 +55,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongConsumer; -import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.stateWithStartedPrimary; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -127,7 +126,7 @@ public class ShardStateActionTests extends ESTestCase { public void testSuccess() throws InterruptedException { final String index = "test"; - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); @@ -169,7 +168,7 @@ public class ShardStateActionTests extends ESTestCase { public void testNoMaster() throws InterruptedException { final String index = "test"; - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); DiscoveryNodes.Builder noMasterBuilder = DiscoveryNodes.builder(clusterService.state().nodes()); noMasterBuilder.masterNodeId(null); @@ -207,7 +206,7 @@ public class ShardStateActionTests extends ESTestCase { public void testMasterChannelException() throws InterruptedException { final String index = "test"; - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); @@ -264,7 +263,7 @@ public class ShardStateActionTests extends ESTestCase { public void testUnhandledFailure() { final String index = "test"; - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); @@ -294,7 +293,7 @@ public class ShardStateActionTests extends ESTestCase { public void testShardNotFound() throws InterruptedException { final String index = "test"; - clusterService.setState(stateWithStartedPrimary(index, true, randomInt(5))); + clusterService.setState(ClusterStateCreationUtils.stateWithActivePrimary(index, true, randomInt(5))); String indexUUID = clusterService.state().metaData().index(index).getIndexUUID(); diff --git a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 9a4e6a814a3..5c93bbb7d5d 100644 --- a/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/core/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -57,6 +57,8 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.Settings; @@ -108,6 +110,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -125,6 +128,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitC import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertSearchHits; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; /** * Simple unit-test IndexShard related operations. @@ -316,36 +320,41 @@ public class IndexShardTests extends ESSingleNodeTestCase { } - public void testDeleteIndexDecreasesCounter() throws InterruptedException, ExecutionException, IOException { + public void testDeleteIndexPreventsNewOperations() throws InterruptedException, ExecutionException, IOException { assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService indexService = indicesService.indexServiceSafe("test"); IndexShard indexShard = indexService.getShardOrNull(0); client().admin().indices().prepareDelete("test").get(); - assertThat(indexShard.getOperationsCount(), equalTo(0)); + assertThat(indexShard.getActiveOperationsCount(), equalTo(0)); try { - indexShard.incrementOperationCounter(); + indexShard.acquirePrimaryOperationLock(); + fail("we should not be able to increment anymore"); + } catch (IndexShardClosedException e) { + // expected + } + try { + indexShard.acquireReplicaOperationLock(); fail("we should not be able to increment anymore"); } catch (IndexShardClosedException e) { // expected } } - public void testIndexShardCounter() throws InterruptedException, ExecutionException, IOException { + public void testIndexOperationsCounter() throws InterruptedException, ExecutionException, IOException { assertAcked(client().admin().indices().prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)).get()); ensureGreen("test"); IndicesService indicesService = getInstanceFromNode(IndicesService.class); IndexService indexService = indicesService.indexServiceSafe("test"); IndexShard indexShard = indexService.getShardOrNull(0); - assertEquals(0, indexShard.getOperationsCount()); - indexShard.incrementOperationCounter(); - assertEquals(1, indexShard.getOperationsCount()); - indexShard.incrementOperationCounter(); - assertEquals(2, indexShard.getOperationsCount()); - indexShard.decrementOperationCounter(); - indexShard.decrementOperationCounter(); - assertEquals(0, indexShard.getOperationsCount()); + assertEquals(0, indexShard.getActiveOperationsCount()); + Releasable operation1 = indexShard.acquirePrimaryOperationLock(); + assertEquals(1, indexShard.getActiveOperationsCount()); + Releasable operation2 = indexShard.acquirePrimaryOperationLock(); + assertEquals(2, indexShard.getActiveOperationsCount()); + Releasables.close(operation1, operation2); + assertEquals(0, indexShard.getActiveOperationsCount()); } public void testMarkAsInactiveTriggersSyncedFlush() throws Exception { @@ -777,6 +786,89 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertEquals(total + 1, shard.flushStats().getTotal()); } + public void testLockingBeforeAndAfterRelocated() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test").setSettings( + Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + ).get()); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + final IndexShard shard = test.getShardOrNull(0); + CountDownLatch latch = new CountDownLatch(1); + Thread recoveryThread = new Thread(() -> { + latch.countDown(); + shard.relocated("simulated recovery"); + }); + + try (Releasable ignored = shard.acquirePrimaryOperationLock()) { + // start finalization of recovery + recoveryThread.start(); + latch.await(); + // recovery can only be finalized after we release the current primaryOperationLock + assertThat(shard.state(), equalTo(IndexShardState.STARTED)); + } + // recovery can be now finalized + recoveryThread.join(); + assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); + try (Releasable ignored = shard.acquirePrimaryOperationLock()) { + // lock can again be acquired + assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); + } + } + + public void testStressRelocated() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test").setSettings( + Settings.builder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + ).get()); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + final IndexShard shard = test.getShardOrNull(0); + final int numThreads = randomIntBetween(2, 4); + Thread[] indexThreads = new Thread[numThreads]; + CountDownLatch somePrimaryOperationLockAcquired = new CountDownLatch(1); + CyclicBarrier barrier = new CyclicBarrier(numThreads + 1); + for (int i = 0; i < indexThreads.length; i++) { + indexThreads[i] = new Thread() { + @Override + public void run() { + try (Releasable operationLock = shard.acquirePrimaryOperationLock()) { + somePrimaryOperationLockAcquired.countDown(); + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + throw new RuntimeException(e); + } + } + }; + indexThreads[i].start(); + } + AtomicBoolean relocated = new AtomicBoolean(); + final Thread recoveryThread = new Thread(() -> { + shard.relocated("simulated recovery"); + relocated.set(true); + }); + // ensure we wait for at least one primary operation lock to be acquired + somePrimaryOperationLockAcquired.await(); + // start recovery thread + recoveryThread.start(); + assertThat(relocated.get(), equalTo(false)); + assertThat(shard.getActiveOperationsCount(), greaterThan(0)); + // ensure we only transition to RELOCATED state after pending operations completed + assertThat(shard.state(), equalTo(IndexShardState.STARTED)); + // complete pending operations + barrier.await(); + // complete recovery/relocation + recoveryThread.join(); + // ensure relocated successfully once pending operations are done + assertThat(relocated.get(), equalTo(true)); + assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); + assertThat(shard.getActiveOperationsCount(), equalTo(0)); + + for (Thread indexThread : indexThreads) { + indexThread.join(); + } + } + public void testRecoverFromStore() throws IOException { createIndex("test"); ensureGreen(); @@ -857,6 +949,27 @@ public class IndexShardTests extends ESSingleNodeTestCase { assertHitCount(client().prepareSearch().get(), 1); } + public void testRecoveryFailsAfterMovingToRelocatedState() throws InterruptedException { + createIndex("test"); + ensureGreen(); + IndicesService indicesService = getInstanceFromNode(IndicesService.class); + IndexService test = indicesService.indexService("test"); + final IndexShard shard = test.getShardOrNull(0); + ShardRouting origRouting = shard.routingEntry(); + assertThat(shard.state(), equalTo(IndexShardState.STARTED)); + ShardRouting inRecoveryRouting = new ShardRouting(origRouting); + ShardRoutingHelper.relocate(inRecoveryRouting, "some_node"); + shard.updateRoutingEntry(inRecoveryRouting, true); + shard.relocated("simulate mark as relocated"); + assertThat(shard.state(), equalTo(IndexShardState.RELOCATED)); + ShardRouting failedRecoveryRouting = new ShardRouting(origRouting); + try { + shard.updateRoutingEntry(failedRecoveryRouting, true); + fail("Expected IndexShardRelocatedException"); + } catch (IndexShardRelocatedException expected) { + } + } + public void testRestoreShard() throws IOException { createIndex("test"); createIndex("test_target"); diff --git a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java index f1f8a8222cb..b074729cdff 100644 --- a/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java +++ b/core/src/test/java/org/elasticsearch/indices/IndicesLifecycleListenerIT.java @@ -58,6 +58,7 @@ import static org.elasticsearch.index.shard.IndexShardState.CLOSED; import static org.elasticsearch.index.shard.IndexShardState.CREATED; import static org.elasticsearch.index.shard.IndexShardState.POST_RECOVERY; import static org.elasticsearch.index.shard.IndexShardState.RECOVERING; +import static org.elasticsearch.index.shard.IndexShardState.RELOCATED; import static org.elasticsearch.index.shard.IndexShardState.STARTED; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.CoreMatchers.equalTo; @@ -181,7 +182,7 @@ public class IndicesLifecycleListenerIT extends ESIntegTestCase { ensureGreen(); //the 3 relocated shards get closed on the first node - assertShardStatesMatch(stateChangeListenerNode1, 3, CLOSED); + assertShardStatesMatch(stateChangeListenerNode1, 3, RELOCATED, CLOSED); //the 3 relocated shards get created on the second node assertShardStatesMatch(stateChangeListenerNode2, 3, CREATED, RECOVERING, POST_RECOVERY, STARTED); diff --git a/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java b/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java index c30a5adaaca..239cb7a9096 100644 --- a/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java +++ b/core/src/test/java/org/elasticsearch/indices/flush/SyncedFlushSingleNodeTests.java @@ -23,6 +23,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; @@ -110,8 +111,7 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { SyncedFlushService flushService = getInstanceFromNode(SyncedFlushService.class); final ShardId shardId = shard.shardId(); - shard.incrementOperationCounter(); - try { + try (Releasable operationLock = shard.acquirePrimaryOperationLock()) { SyncedFlushUtil.LatchedListener listener = new SyncedFlushUtil.LatchedListener<>(); flushService.attemptSyncedFlush(shardId, listener); listener.latch.await(); @@ -121,8 +121,6 @@ public class SyncedFlushSingleNodeTests extends ESSingleNodeTestCase { assertEquals(0, syncedFlushResult.successfulShards()); assertNotEquals(0, syncedFlushResult.totalShards()); assertEquals("[1] ongoing operations on primary", syncedFlushResult.failureReason()); - } finally { - shard.decrementOperationCounter(); } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java new file mode 100644 index 00000000000..727641eb224 --- /dev/null +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexPrimaryRelocationIT.java @@ -0,0 +1,89 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.indices.recovery; + +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.delete.DeleteResponse; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; +import org.elasticsearch.test.junit.annotations.TestLogging; + +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.hamcrest.Matchers.equalTo; + +@TestLogging("_root:DEBUG") +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST) +public class IndexPrimaryRelocationIT extends ESIntegTestCase { + + private static final int RELOCATION_COUNT = 25; + + public void testPrimaryRelocationWhileIndexing() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(randomIntBetween(2, 3)); + client().admin().indices().prepareCreate("test") + .setSettings(Settings.settingsBuilder().put("index.number_of_shards", 1).put("index.number_of_replicas", 0)) + .addMapping("type", "field", "type=string") + .get(); + ensureGreen("test"); + + final AtomicBoolean finished = new AtomicBoolean(false); + Thread indexingThread = new Thread() { + @Override + public void run() { + while (finished.get() == false) { + IndexResponse indexResponse = client().prepareIndex("test", "type", "id").setSource("field", "value").get(); + assertThat("deleted document was found", indexResponse.isCreated(), equalTo(true)); + DeleteResponse deleteResponse = client().prepareDelete("test", "type", "id").get(); + assertThat("indexed document was not found", deleteResponse.isFound(), equalTo(true)); + } + } + }; + indexingThread.start(); + + ClusterState initialState = client().admin().cluster().prepareState().get().getState(); + DiscoveryNode[] dataNodes = initialState.getNodes().dataNodes().values().toArray(DiscoveryNode.class); + DiscoveryNode relocationSource = initialState.getNodes().dataNodes().get(initialState.getRoutingTable().shardRoutingTable("test", 0).primaryShard().currentNodeId()); + for (int i = 0; i < RELOCATION_COUNT; i++) { + DiscoveryNode relocationTarget = randomFrom(dataNodes); + while (relocationTarget.equals(relocationSource)) { + relocationTarget = randomFrom(dataNodes); + } + logger.info("--> [iteration {}] relocating from {} to {} ", i, relocationSource.getName(), relocationTarget.getName()); + client().admin().cluster().prepareReroute() + .add(new MoveAllocationCommand("test", 0, relocationSource.getId(), relocationTarget.getId())) + .execute().actionGet(); + ClusterHealthResponse clusterHealthResponse = client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForRelocatingShards(0).execute().actionGet(); + assertThat(clusterHealthResponse.isTimedOut(), equalTo(false)); + logger.info("--> [iteration {}] relocation complete", i); + relocationSource = relocationTarget; + if (indexingThread.isAlive() == false) { // indexing process aborted early, no need for more relocations as test has already failed + break; + } + + } + finished.set(true); + indexingThread.join(); + } +} diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 12acea4f9ac..cc11cb82057 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -286,7 +286,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.STORE, Stage.DONE, nodeA, nodeA, false); validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - assertOnGoingRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, nodeA, nodeB, false); + assertOnGoingRecoveryState(nodeBRecoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, nodeA, nodeB, false); validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); logger.info("--> request node recovery stats"); @@ -339,7 +339,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { recoveryStates = response.shardRecoveryStates().get(INDEX_NAME); assertThat(recoveryStates.size(), equalTo(1)); - assertRecoveryState(recoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + assertRecoveryState(recoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, Stage.DONE, nodeA, nodeB, false); validateIndexRecoveryState(recoveryStates.get(0).getIndex()); statsResponse = client().admin().cluster().prepareNodesStats().clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Recovery)).get(); @@ -400,7 +400,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { assertRecoveryState(nodeARecoveryStates.get(0), 0, Type.REPLICA, Stage.DONE, nodeB, nodeA, false); validateIndexRecoveryState(nodeARecoveryStates.get(0).getIndex()); - assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, Stage.DONE, nodeA, nodeB, false); validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); // relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B) @@ -421,7 +421,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { nodeCRecoveryStates = findRecoveriesForTargetNode(nodeC, recoveryStates); assertThat(nodeCRecoveryStates.size(), equalTo(1)); - assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.RELOCATION, Stage.DONE, nodeA, nodeB, false); + assertRecoveryState(nodeBRecoveryStates.get(0), 0, Type.PRIMARY_RELOCATION, Stage.DONE, nodeA, nodeB, false); validateIndexRecoveryState(nodeBRecoveryStates.get(0).getIndex()); // relocations of replicas are marked as REPLICA and the source node is the node holding the primary (B) @@ -503,7 +503,7 @@ public class IndexRecoveryIT extends ESIntegTestCase { final IndexRequestBuilder[] docs = new IndexRequestBuilder[numDocs]; for (int i = 0; i < numDocs; i++) { - docs[i] = client().prepareIndex(INDEX_NAME, INDEX_TYPE). + docs[i] = client().prepareIndex(name, INDEX_TYPE). setSource("foo-int", randomInt(), "foo-string", randomAsciiOfLength(32), "foo-float", randomFloat()); @@ -511,8 +511,8 @@ public class IndexRecoveryIT extends ESIntegTestCase { indexRandom(true, docs); flush(); - assertThat(client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().totalHits(), equalTo((long) numDocs)); - return client().admin().indices().prepareStats(INDEX_NAME).execute().actionGet(); + assertThat(client().prepareSearch(name).setSize(0).get().getHits().totalHits(), equalTo((long) numDocs)); + return client().admin().indices().prepareStats(name).execute().actionGet(); } private void validateIndexRecoveryState(RecoveryState.Index indexState) { diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java index c8cad5be296..b29404d59b6 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/RecoverySourceHandlerTests.java @@ -69,7 +69,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { StartRecoveryRequest request = new StartRecoveryRequest(shardId, new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), - randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); + null, RecoveryState.Type.STORE, randomLong()); Store store = newStore(createTempDir()); RecoverySourceHandler handler = new RecoverySourceHandler(null, request, recoverySettings, null, logger); Directory dir = store.directory(); @@ -118,7 +118,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { StartRecoveryRequest request = new StartRecoveryRequest(shardId, new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), - randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); + null, RecoveryState.Type.STORE, randomLong()); Path tempDir = createTempDir(); Store store = newStore(tempDir, false); AtomicBoolean failedEngine = new AtomicBoolean(false); @@ -181,7 +181,7 @@ public class RecoverySourceHandlerTests extends ESTestCase { StartRecoveryRequest request = new StartRecoveryRequest(shardId, new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), new DiscoveryNode("b", DummyTransportAddress.INSTANCE, Version.CURRENT), - randomBoolean(), null, RecoveryState.Type.STORE, randomLong()); + null, RecoveryState.Type.STORE, randomLong()); Path tempDir = createTempDir(); Store store = newStore(tempDir, false); AtomicBoolean failedEngine = new AtomicBoolean(false); diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java b/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java index c7a7852e426..3406388bd5b 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/StartRecoveryRequestTests.java @@ -43,11 +43,9 @@ public class StartRecoveryRequestTests extends ESTestCase { new ShardId("test", "_na_", 0), new DiscoveryNode("a", new LocalTransportAddress("1"), targetNodeVersion), new DiscoveryNode("b", new LocalTransportAddress("1"), targetNodeVersion), - true, Store.MetadataSnapshot.EMPTY, - RecoveryState.Type.RELOCATION, + RecoveryState.Type.PRIMARY_RELOCATION, 1L - ); ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); OutputStreamStreamOutput out = new OutputStreamStreamOutput(outBuffer); @@ -63,7 +61,6 @@ public class StartRecoveryRequestTests extends ESTestCase { assertThat(outRequest.shardId(), equalTo(inRequest.shardId())); assertThat(outRequest.sourceNode(), equalTo(inRequest.sourceNode())); assertThat(outRequest.targetNode(), equalTo(inRequest.targetNode())); - assertThat(outRequest.markAsRelocated(), equalTo(inRequest.markAsRelocated())); assertThat(outRequest.metadataSnapshot().asMap(), equalTo(inRequest.metadataSnapshot().asMap())); assertThat(outRequest.recoveryId(), equalTo(inRequest.recoveryId())); assertThat(outRequest.recoveryType(), equalTo(inRequest.recoveryType())); diff --git a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java index 61dca3f37af..8c6b71c9eac 100644 --- a/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java +++ b/core/src/test/java/org/elasticsearch/recovery/FullRollingRestartIT.java @@ -151,7 +151,7 @@ public class FullRollingRestartIT extends ESIntegTestCase { ClusterState state = client().admin().cluster().prepareState().get().getState(); RecoveryResponse recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) { - assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode() + "\n" + state.prettyPrint(), recoveryState.getType() != RecoveryState.Type.RELOCATION); + assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode() + "\n" + state.prettyPrint(), recoveryState.getType() != RecoveryState.Type.PRIMARY_RELOCATION); } internalCluster().restartRandomDataNode(); ensureGreen(); @@ -159,7 +159,7 @@ public class FullRollingRestartIT extends ESIntegTestCase { recoveryResponse = client().admin().indices().prepareRecoveries("test").get(); for (RecoveryState recoveryState : recoveryResponse.shardRecoveryStates().get("test")) { - assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode()+ "-- \nbefore: \n" + state.prettyPrint() + "\nafter: \n" + afterState.prettyPrint(), recoveryState.getType() != RecoveryState.Type.RELOCATION); + assertTrue("relocated from: " + recoveryState.getSourceNode() + " to: " + recoveryState.getTargetNode()+ "-- \nbefore: \n" + state.prettyPrint() + "\nafter: \n" + afterState.prettyPrint(), recoveryState.getType() != RecoveryState.Type.PRIMARY_RELOCATION); } } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 658264864e0..d9f634d503e 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1036,7 +1036,7 @@ public final class InternalTestCluster extends TestCluster { IndicesService indexServices = getInstance(IndicesService.class, nodeAndClient.name); for (IndexService indexService : indexServices) { for (IndexShard indexShard : indexService) { - assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getOperationsCount(), equalTo(0)); + assertThat("index shard counter on shard " + indexShard.shardId() + " on node " + nodeAndClient.name + " not 0", indexShard.getActiveOperationsCount(), equalTo(0)); } } }