From 6e6d9eb25514f8e533ba7973f69a2aaf09de5a88 Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 29 Dec 2016 10:58:15 +0100 Subject: [PATCH] Use a fresh recovery id when retrying recoveries (#22325) Recoveries are tracked on the target node using RecoveryTarget objects that are kept in a RecoveriesCollection. Each recovery has a unique id that is communicated from the recovery target to the source so that it can call back to the target and execute actions using the right recovery context. In case of a network disconnect, recoveries are retried. At the moment, the same recovery id is reused for the restarted recovery. This can lead to confusion though if the disconnect is unilateral and the recovery source continues with the recovery process. If the target reuses the same recovery id while doing a second attempt, there might be two concurrent recoveries running on the source for the same target. This commit changes the recovery retry process to use a fresh recovery id. It also waits for the first recovery attempt to be fully finished (all resources locally freed) to further prevent concurrent access to the shard. Finally, in case of primary relocation, it also fails a second recovery attempt if the first attempt moved past the finalization step, as the relocation source can then be moved to RELOCATED state and start indexing as primary into the target shard (see TransportReplicationAction). Resetting the target shard in this state could mean that indexing is halted until the recovery retry attempt is completed and could also destroy existing documents indexed and acknowledged before the reset. Relates to #22043 --- .../recovery/PeerRecoveryTargetService.java | 195 ++++++++++-------- .../recovery/RecoveriesCollection.java | 93 ++++++--- .../indices/recovery/RecoverySettings.java | 2 +- .../recovery/RecoverySourceHandler.java | 2 + .../indices/recovery/RecoveryTarget.java | 60 ++++-- .../indices/recovery/IndexRecoveryIT.java | 131 +++++++++++- .../recovery/RecoveriesCollectionTests.java | 54 ++--- 7 files changed, 372 insertions(+), 165 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java index 11554358b30..894399e851e 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/PeerRecoveryTargetService.java @@ -48,6 +48,7 @@ import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardNotFoundException; import org.elasticsearch.index.shard.TranslogRecoveryPerformer; import org.elasticsearch.index.store.Store; +import org.elasticsearch.indices.recovery.RecoveriesCollection.RecoveryRef; import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ConnectTransportException; @@ -139,65 +140,85 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde threadPool.generic().execute(new RecoveryRunner(recoveryId)); } - protected void retryRecovery(final RecoveryTarget recoveryTarget, final Throwable reason, TimeValue retryAfter, final - StartRecoveryRequest currentRequest) { + protected void retryRecovery(final long recoveryId, final Throwable reason, TimeValue retryAfter, TimeValue activityTimeout) { logger.trace( (Supplier) () -> new ParameterizedMessage( - "will retry recovery with id [{}] in [{}]", recoveryTarget.recoveryId(), retryAfter), reason); - retryRecovery(recoveryTarget, retryAfter, currentRequest); + "will retry recovery with id [{}] in [{}]", recoveryId, retryAfter), reason); + retryRecovery(recoveryId, retryAfter, activityTimeout); } - protected void retryRecovery(final RecoveryTarget recoveryTarget, final String reason, TimeValue retryAfter, final - StartRecoveryRequest currentRequest) { - logger.trace("will retry recovery with id [{}] in [{}] (reason [{}])", recoveryTarget.recoveryId(), retryAfter, reason); - retryRecovery(recoveryTarget, retryAfter, currentRequest); + protected void retryRecovery(final long recoveryId, final String reason, TimeValue retryAfter, TimeValue activityTimeout) { + logger.trace("will retry recovery with id [{}] in [{}] (reason [{}])", recoveryId, retryAfter, reason); + retryRecovery(recoveryId, retryAfter, activityTimeout); } - private void retryRecovery(final RecoveryTarget recoveryTarget, TimeValue retryAfter, final StartRecoveryRequest currentRequest) { - try { - onGoingRecoveries.resetRecovery(recoveryTarget.recoveryId(), recoveryTarget.shardId()); - } catch (Exception e) { - onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(currentRequest, e), true); + private void retryRecovery(final long recoveryId, TimeValue retryAfter, TimeValue activityTimeout) { + RecoveryTarget newTarget = onGoingRecoveries.resetRecovery(recoveryId, activityTimeout); + if (newTarget != null) { + threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(newTarget.recoveryId())); } - threadPool.schedule(retryAfter, ThreadPool.Names.GENERIC, new RecoveryRunner(recoveryTarget.recoveryId())); } - private void doRecovery(final RecoveryTarget recoveryTarget) { - assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node"; + private void doRecovery(final long recoveryId) { + final StartRecoveryRequest request; + final CancellableThreads cancellableThreads; + final RecoveryState.Timer timer; - logger.trace("collecting local files for {}", recoveryTarget); - Store.MetadataSnapshot metadataSnapshot = null; - try { - if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) { - // we are not going to copy any files, so don't bother listing files, potentially running - // into concurrency issues with the primary changing files underneath us. - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - } else { - metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { + if (recoveryRef == null) { + logger.trace("not running recovery with id [{}] - can't find it (probably finished)", recoveryId); + return; } - } catch (org.apache.lucene.index.IndexNotFoundException e) { - // happens on an empty folder. no need to log - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - } catch (IOException e) { - logger.warn("error while listing local files, recover as if there are none", e); - metadataSnapshot = Store.MetadataSnapshot.EMPTY; - } catch (Exception e) { - // this will be logged as warning later on... - logger.trace("unexpected error while listing local files, failing recovery", e); - onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), - new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true); - return; - } - logger.trace("{} local file count: [{}]", recoveryTarget, metadataSnapshot.size()); - final StartRecoveryRequest request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(), - clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId()); + RecoveryTarget recoveryTarget = recoveryRef.target(); + assert recoveryTarget.sourceNode() != null : "can't do a recovery without a source node"; + + logger.trace("collecting local files for {}", recoveryTarget.sourceNode()); + Store.MetadataSnapshot metadataSnapshot; + try { + if (recoveryTarget.indexShard().indexSettings().isOnSharedFilesystem()) { + // we are not going to copy any files, so don't bother listing files, potentially running + // into concurrency issues with the primary changing files underneath us. + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + } else { + metadataSnapshot = recoveryTarget.indexShard().snapshotStoreMetadata(); + } + logger.trace("{} local file count: [{}]", recoveryTarget, metadataSnapshot.size()); + } catch (org.apache.lucene.index.IndexNotFoundException e) { + // happens on an empty folder. no need to log + logger.trace("{} shard folder empty, recover all files", recoveryTarget); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + } catch (IOException e) { + logger.warn("error while listing local files, recover as if there are none", e); + metadataSnapshot = Store.MetadataSnapshot.EMPTY; + } catch (Exception e) { + // this will be logged as warning later on... + logger.trace("unexpected error while listing local files, failing recovery", e); + onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), + new RecoveryFailedException(recoveryTarget.state(), "failed to list local files", e), true); + return; + } + + try { + logger.trace("{} preparing shard for peer recovery", recoveryTarget.shardId()); + recoveryTarget.indexShard().prepareForIndexRecovery(); + + request = new StartRecoveryRequest(recoveryTarget.shardId(), recoveryTarget.sourceNode(), + clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId()); + cancellableThreads = recoveryTarget.CancellableThreads(); + timer = recoveryTarget.state().getTimer(); + } catch (Exception e) { + // this will be logged as warning later on... + logger.trace("unexpected error while preparing shard for peer recovery, failing recovery", e); + onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), + new RecoveryFailedException(recoveryTarget.state(), "failed to prepare shard for recovery", e), true); + return; + } + } - final AtomicReference responseHolder = new AtomicReference<>(); try { - logger.trace("[{}][{}] starting recovery from {}", request.shardId().getIndex().getName(), request.shardId().id(), request - .sourceNode()); - recoveryTarget.indexShard().prepareForIndexRecovery(); - recoveryTarget.CancellableThreads().execute(() -> responseHolder.set( + logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode()); + final AtomicReference responseHolder = new AtomicReference<>(); + cancellableThreads.execute(() -> responseHolder.set( transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request, new FutureTransportResponseHandler() { @Override @@ -207,9 +228,9 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde }).txGet())); final RecoveryResponse recoveryResponse = responseHolder.get(); assert responseHolder != null; - final TimeValue recoveryTime = new TimeValue(recoveryTarget.state().getTimer().time()); + final TimeValue recoveryTime = new TimeValue(timer.time()); // do this through ongoing recoveries to remove it from the collection - onGoingRecoveries.markRecoveryAsDone(recoveryTarget.recoveryId()); + onGoingRecoveries.markRecoveryAsDone(recoveryId); if (logger.isTraceEnabled()) { StringBuilder sb = new StringBuilder(); sb.append('[').append(request.shardId().getIndex().getName()).append(']').append('[').append(request.shardId().id()) @@ -229,7 +250,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde .append("\n"); logger.trace("{}", sb); } else { - logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), recoveryTarget.sourceNode(), recoveryTime); + logger.debug("{} recovery done from [{}], took [{}]", request.shardId(), request.sourceNode(), recoveryTime); } } catch (CancellableThreads.ExecutionCancelledException e) { logger.trace("recovery cancelled", e); @@ -245,8 +266,8 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde Throwable cause = ExceptionsHelper.unwrapCause(e); if (cause instanceof CancellableThreads.ExecutionCancelledException) { // this can also come from the source wrapped in a RemoteTransportException - onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(request, "source has canceled the" + - " recovery", cause), false); + onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, + "source has canceled the recovery", cause), false); return; } if (cause instanceof RecoveryEngineException) { @@ -262,31 +283,34 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde // here, we would add checks against exception that need to be retried (and not removeAndClean in this case) - if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException || cause instanceof - ShardNotFoundException) { + if (cause instanceof IllegalIndexShardStateException || cause instanceof IndexNotFoundException || + cause instanceof ShardNotFoundException) { // if the target is not ready yet, retry - retryRecovery(recoveryTarget, "remote shard not ready", recoverySettings.retryDelayStateSync(), request); + retryRecovery(recoveryId, "remote shard not ready", recoverySettings.retryDelayStateSync(), + recoverySettings.activityTimeout()); return; } if (cause instanceof DelayRecoveryException) { - retryRecovery(recoveryTarget, cause, recoverySettings.retryDelayStateSync(), request); + retryRecovery(recoveryId, cause, recoverySettings.retryDelayStateSync(), + recoverySettings.activityTimeout()); return; } if (cause instanceof ConnectTransportException) { - logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", recoveryTarget.shardId(), recoverySettings - .retryDelayNetwork(), cause.getMessage()); - retryRecovery(recoveryTarget, cause.getMessage(), recoverySettings.retryDelayNetwork(), request); + logger.debug("delaying recovery of {} for [{}] due to networking error [{}]", request.shardId(), + recoverySettings.retryDelayNetwork(), cause.getMessage()); + retryRecovery(recoveryId, cause.getMessage(), recoverySettings.retryDelayNetwork(), + recoverySettings.activityTimeout()); return; } if (cause instanceof AlreadyClosedException) { - onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(request, "source shard is " + - "closed", cause), false); + onGoingRecoveries.failRecovery(recoveryId, + new RecoveryFailedException(request, "source shard is closed", cause), false); return; } - onGoingRecoveries.failRecovery(recoveryTarget.recoveryId(), new RecoveryFailedException(request, e), true); + onGoingRecoveries.failRecovery(recoveryId, new RecoveryFailedException(request, e), true); } } @@ -300,9 +324,9 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde @Override public void messageReceived(RecoveryPrepareForTranslogOperationsRequest request, TransportChannel channel) throws Exception { - try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - recoveryRef.status().prepareForTranslogOperations(request.totalTranslogOps(), request.getMaxUnsafeAutoIdTimestamp()); + recoveryRef.target().prepareForTranslogOperations(request.totalTranslogOps(), request.getMaxUnsafeAutoIdTimestamp()); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -312,9 +336,9 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde @Override public void messageReceived(RecoveryFinalizeRecoveryRequest request, TransportChannel channel) throws Exception { - try (RecoveriesCollection.RecoveryRef recoveryRef = + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { - recoveryRef.status().finalizeRecovery(request.globalCheckpoint()); + recoveryRef.target().finalizeRecovery(request.globalCheckpoint()); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -324,9 +348,9 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde @Override public void messageReceived(RecoveryWaitForClusterStateRequest request, TransportChannel channel) throws Exception { - try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - recoveryRef.status().ensureClusterStateVersion(request.clusterStateVersion()); + recoveryRef.target().ensureClusterStateVersion(request.clusterStateVersion()); } channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -336,10 +360,10 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde @Override public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws IOException { - try (RecoveriesCollection.RecoveryRef recoveryRef = + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId())) { final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger, threadPool.getThreadContext()); - final RecoveryTarget recoveryTarget = recoveryRef.status(); + final RecoveryTarget recoveryTarget = recoveryRef.target(); try { recoveryTarget.indexTranslogOperations(request.operations(), request.totalTranslogOps()); channel.sendResponse(TransportResponse.Empty.INSTANCE); @@ -443,9 +467,9 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde @Override public void messageReceived(RecoveryFilesInfoRequest request, TransportChannel channel) throws Exception { - try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - recoveryRef.status().receiveFileInfo(request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames, + recoveryRef.target().receiveFileInfo(request.phase1FileNames, request.phase1FileSizes, request.phase1ExistingFileNames, request.phase1ExistingFileSizes, request.totalTranslogOps); channel.sendResponse(TransportResponse.Empty.INSTANCE); } @@ -456,9 +480,9 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde @Override public void messageReceived(RecoveryCleanFilesRequest request, TransportChannel channel) throws Exception { - try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - recoveryRef.status().cleanFiles(request.totalTranslogOps(), request.sourceMetaSnapshot()); + recoveryRef.target().cleanFiles(request.totalTranslogOps(), request.sourceMetaSnapshot()); channel.sendResponse(TransportResponse.Empty.INSTANCE); } } @@ -471,10 +495,10 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde @Override public void messageReceived(final RecoveryFileChunkRequest request, TransportChannel channel) throws Exception { - try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecoverySafe(request.recoveryId(), request.shardId() )) { - final RecoveryTarget status = recoveryRef.status(); - final RecoveryState.Index indexState = status.state().getIndex(); + final RecoveryTarget recoveryTarget = recoveryRef.target(); + final RecoveryState.Index indexState = recoveryTarget.state().getIndex(); if (request.sourceThrottleTimeInNanos() != RecoveryState.Index.UNKNOWN) { indexState.addSourceThrottling(request.sourceThrottleTimeInNanos()); } @@ -487,11 +511,11 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde bytesSinceLastPause.addAndGet(-bytes); long throttleTimeInNanos = rateLimiter.pause(bytes); indexState.addTargetThrottling(throttleTimeInNanos); - status.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos); + recoveryTarget.indexShard().recoveryStats().addThrottleTime(throttleTimeInNanos); } } - status.writeFileChunk(request.metadata(), request.position(), request.content(), + recoveryTarget.writeFileChunk(request.metadata(), request.position(), request.content(), request.lastChunk(), request.totalTranslogOps() ); } @@ -509,13 +533,13 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde @Override public void onFailure(Exception e) { - try (RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { + try (RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId)) { if (recoveryRef != null) { logger.error( (Supplier) () -> new ParameterizedMessage( "unexpected error during recovery [{}], failing shard", recoveryId), e); onGoingRecoveries.failRecovery(recoveryId, - new RecoveryFailedException(recoveryRef.status().state(), "unexpected error", e), + new RecoveryFailedException(recoveryRef.target().state(), "unexpected error", e), true // be safe ); } else { @@ -528,16 +552,7 @@ public class PeerRecoveryTargetService extends AbstractComponent implements Inde @Override public void doRun() { - RecoveriesCollection.RecoveryRef recoveryRef = onGoingRecoveries.getRecovery(recoveryId); - if (recoveryRef == null) { - logger.trace("not running recovery with id [{}] - can't find it (probably finished)", recoveryId); - return; - } - try { - doRecovery(recoveryRef.status()); - } finally { - recoveryRef.close(); - } + doRecovery(recoveryId); } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index 65a48b18e22..2ed79a8a996 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -33,7 +33,9 @@ import org.elasticsearch.index.shard.IndexShardClosedException; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.threadpool.ThreadPool; -import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -65,13 +67,18 @@ public class RecoveriesCollection { */ public long startRecovery(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, TimeValue activityTimeout) { - RecoveryTarget status = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback); - RecoveryTarget existingStatus = onGoingRecoveries.putIfAbsent(status.recoveryId(), status); - assert existingStatus == null : "found two RecoveryStatus instances with the same id"; - logger.trace("{} started recovery from {}, id [{}]", indexShard.shardId(), sourceNode, status.recoveryId()); + RecoveryTarget recoveryTarget = new RecoveryTarget(indexShard, sourceNode, listener, ensureClusterStateVersionCallback); + startRecoveryInternal(recoveryTarget, activityTimeout); + return recoveryTarget.recoveryId(); + } + + private void startRecoveryInternal(RecoveryTarget recoveryTarget, TimeValue activityTimeout) { + RecoveryTarget existingTarget = onGoingRecoveries.putIfAbsent(recoveryTarget.recoveryId(), recoveryTarget); + assert existingTarget == null : "found two RecoveryStatus instances with the same id"; + logger.trace("{} started recovery from {}, id [{}]", recoveryTarget.shardId(), recoveryTarget.sourceNode(), + recoveryTarget.recoveryId()); threadPool.schedule(activityTimeout, ThreadPool.Names.GENERIC, - new RecoveryMonitor(status.recoveryId(), status.lastAccessTime(), activityTimeout)); - return status.recoveryId(); + new RecoveryMonitor(recoveryTarget.recoveryId(), recoveryTarget.lastAccessTime(), activityTimeout)); } @@ -79,22 +86,48 @@ public class RecoveriesCollection { * Resets the recovery and performs a recovery restart on the currently recovering index shard * * @see IndexShard#performRecoveryRestart() + * @return newly created RecoveryTarget */ - public void resetRecovery(long id, ShardId shardId) throws IOException { - try (RecoveryRef ref = getRecoverySafe(id, shardId)) { - // instead of adding complicated state to RecoveryTarget we just flip the - // target instance when we reset a recovery, that way we have only one cleanup - // path on the RecoveryTarget and are always within the bounds of ref-counting - // which is important since we verify files are on disk etc. after we have written them etc. - RecoveryTarget status = ref.status(); - RecoveryTarget resetRecovery = status.resetRecovery(); - if (onGoingRecoveries.replace(id, status, resetRecovery) == false) { - resetRecovery.cancel("replace failed"); // this is important otherwise we leak a reference to the store - throw new IllegalStateException("failed to replace recovery target"); + public RecoveryTarget resetRecovery(final long recoveryId, TimeValue activityTimeout) { + RecoveryTarget oldRecoveryTarget = null; + final RecoveryTarget newRecoveryTarget; + + try { + synchronized (onGoingRecoveries) { + // swap recovery targets in a synchronized block to ensure that the newly added recovery target is picked up by + // cancelRecoveriesForShard whenever the old recovery target is picked up + oldRecoveryTarget = onGoingRecoveries.remove(recoveryId); + if (oldRecoveryTarget == null) { + return null; + } + + newRecoveryTarget = oldRecoveryTarget.retryCopy(); + startRecoveryInternal(newRecoveryTarget, activityTimeout); } + + // Closes the current recovery target + final RecoveryTarget finalOldRecoveryTarget = oldRecoveryTarget; + final AtomicBoolean successfulReset = new AtomicBoolean(); + newRecoveryTarget.CancellableThreads().executeIO(() -> successfulReset.set(finalOldRecoveryTarget.resetRecovery())); + if (successfulReset.get() == false) { + cancelRecovery(newRecoveryTarget.recoveryId(), "failed to reset recovery"); + return null; + } else { + logger.trace("{} restarted recovery from {}, id [{}], previous id [{}]", newRecoveryTarget.shardId(), + newRecoveryTarget.sourceNode(), newRecoveryTarget.recoveryId(), oldRecoveryTarget.recoveryId()); + return newRecoveryTarget; + } + } catch (Exception e) { + // fail shard to be safe + oldRecoveryTarget.notifyListener(new RecoveryFailedException(oldRecoveryTarget.state(), "failed to retry recovery", e), true); + return null; } } + public RecoveryTarget getRecoveryTarget(long id) { + return onGoingRecoveries.get(id); + } + /** * gets the {@link RecoveryTarget } for a given id. The RecoveryStatus returned has it's ref count already incremented * to make sure it's safe to use. However, you must call {@link RecoveryTarget#decRef()} when you are done with it, typically @@ -116,7 +149,7 @@ public class RecoveriesCollection { if (recoveryRef == null) { throw new IndexShardClosedException(shardId); } - assert recoveryRef.status().shardId().equals(shardId); + assert recoveryRef.target().shardId().equals(shardId); return recoveryRef; } @@ -143,7 +176,8 @@ public class RecoveriesCollection { public void failRecovery(long id, RecoveryFailedException e, boolean sendShardFailure) { RecoveryTarget removed = onGoingRecoveries.remove(id); if (removed != null) { - logger.trace("{} failing recovery from {}, id [{}]. Send shard failure: [{}]", removed.shardId(), removed.sourceNode(), removed.recoveryId(), sendShardFailure); + logger.trace("{} failing recovery from {}, id [{}]. Send shard failure: [{}]", removed.shardId(), removed.sourceNode(), + removed.recoveryId(), sendShardFailure); removed.fail(e, sendShardFailure); } } @@ -171,11 +205,22 @@ public class RecoveriesCollection { */ public boolean cancelRecoveriesForShard(ShardId shardId, String reason) { boolean cancelled = false; - for (RecoveryTarget status : onGoingRecoveries.values()) { - if (status.shardId().equals(shardId)) { - cancelled |= cancelRecovery(status.recoveryId(), reason); + List matchedRecoveries = new ArrayList<>(); + synchronized (onGoingRecoveries) { + for (Iterator it = onGoingRecoveries.values().iterator(); it.hasNext(); ) { + RecoveryTarget status = it.next(); + if (status.shardId().equals(shardId)) { + matchedRecoveries.add(status); + it.remove(); + } } } + for (RecoveryTarget removed : matchedRecoveries) { + logger.trace("{} canceled recovery from {}, id [{}] (reason [{}])", + removed.shardId(), removed.sourceNode(), removed.recoveryId(), reason); + removed.cancel(reason); + cancelled = true; + } return cancelled; } @@ -205,7 +250,7 @@ public class RecoveriesCollection { } } - public RecoveryTarget status() { + public RecoveryTarget target() { return status; } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index d4ddccd8742..e238277b698 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -69,7 +69,7 @@ public class RecoverySettings extends AbstractComponent { */ public static final Setting INDICES_RECOVERY_ACTIVITY_TIMEOUT_SETTING = Setting.timeSetting("indices.recovery.recovery_activity_timeout", - INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING::get, TimeValue.timeValueSeconds(0), + INDICES_RECOVERY_INTERNAL_LONG_ACTION_TIMEOUT_SETTING::get, TimeValue.timeValueSeconds(0), Property.Dynamic, Property.NodeScope); public static final ByteSizeValue DEFAULT_CHUNK_SIZE = new ByteSizeValue(512, ByteSizeUnit.KB); diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index fa1a9a7979a..be055531813 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -148,6 +148,8 @@ public class RecoverySourceHandler { // engine was just started at the end of phase 1 if (shard.state() == IndexShardState.RELOCATED) { + assert request.isPrimaryRelocation() == false : + "recovery target should not retry primary relocation if previous attempt made it past finalization step"; /** * The primary shard has been relocated while we copied files. This means that we can't guarantee any more that all * operations that were replicated during the file copy (when the target engine was not yet opened) will be present in the diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 67ee1a5ac9a..981a5f3ed8f 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -87,17 +88,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget // last time this status was accessed private volatile long lastAccessTime = System.nanoTime(); + // latch that can be used to blockingly wait for RecoveryTarget to be closed + private final CountDownLatch closedLatch = new CountDownLatch(1); + private final Map tempFileNames = ConcurrentCollections.newConcurrentMap(); - private RecoveryTarget(RecoveryTarget copyFrom) { // copy constructor - this(copyFrom.indexShard, copyFrom.sourceNode, copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId, - copyFrom.ensureClusterStateVersionCallback); - } - - public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, - Callback ensureClusterStateVersionCallback) { - this(indexShard, sourceNode, listener, new CancellableThreads(), idGenerator.incrementAndGet(), ensureClusterStateVersionCallback); - } /** * creates a new recovery target object that represents a recovery to the provided indexShard * @@ -108,11 +103,11 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget * version. Necessary for primary relocation so that new primary knows about all other ongoing * replica recoveries when replicating documents (see {@link RecoverySourceHandler}). */ - private RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, - CancellableThreads cancellableThreads, long recoveryId, Callback ensureClusterStateVersionCallback) { + public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, PeerRecoveryTargetService.RecoveryListener listener, + Callback ensureClusterStateVersionCallback) { super("recovery_status"); - this.cancellableThreads = cancellableThreads; - this.recoveryId = recoveryId; + this.cancellableThreads = new CancellableThreads(); + this.recoveryId = idGenerator.incrementAndGet(); this.listener = listener; this.logger = Loggers.getLogger(getClass(), indexShard.indexSettings().getSettings(), indexShard.shardId()); this.indexShard = indexShard; @@ -126,6 +121,13 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget indexShard.recoveryStats().incCurrentAsTarget(); } + /** + * returns a fresh RecoveryTarget to retry recovery from the same source node onto the same IndexShard and using the same listener + */ + public RecoveryTarget retryCopy() { + return new RecoveryTarget(this.indexShard, this.sourceNode, this.listener, this.ensureClusterStateVersionCallback); + } + public long recoveryId() { return recoveryId; } @@ -177,19 +179,28 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } /** - * Closes the current recovery target and returns a - * clone to reset the ongoing recovery. - * Note: the returned target must be canceled, failed or finished - * in order to release all it's reference. + * Closes the current recovery target and waits up to a certain timeout for resources to be freed. + * Returns true if resetting the recovery was successful, false if the recovery target is already cancelled / failed or marked as done. */ - RecoveryTarget resetRecovery() throws IOException { - ensureRefCount(); + boolean resetRecovery() throws InterruptedException, IOException { if (finished.compareAndSet(false, true)) { + logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId); // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now decRef(); + closedLatch.await(); + RecoveryState.Stage stage = indexShard.recoveryState().getStage(); + if (indexShard.recoveryState().getPrimary() && (stage == RecoveryState.Stage.FINALIZE || stage == RecoveryState.Stage.DONE)) { + // once primary relocation has moved past the finalization step, the relocation source can be moved to RELOCATED state + // and start indexing as primary into the target shard (see TransportReplicationAction). Resetting the target shard in this + // state could mean that indexing is halted until the recovery retry attempt is completed and could also destroy existing + // documents indexed and acknowledged before the reset. + assert stage != RecoveryState.Stage.DONE : "recovery should not have completed when it's being reset"; + throw new IllegalStateException("cannot reset recovery as previous attempt made it past finalization step"); + } + indexShard.performRecoveryRestart(); + return true; } - indexShard.performRecoveryRestart(); - return new RecoveryTarget(this); + return false; } /** @@ -220,7 +231,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget public void fail(RecoveryFailedException e, boolean sendShardFailure) { if (finished.compareAndSet(false, true)) { try { - listener.onRecoveryFailure(state(), e, sendShardFailure); + notifyListener(e, sendShardFailure); } finally { try { cancellableThreads.cancel("failed recovery [" + ExceptionsHelper.stackTrace(e) + "]"); @@ -232,6 +243,10 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget } } + public void notifyListener(RecoveryFailedException e, boolean sendShardFailure) { + listener.onRecoveryFailure(state(), e, sendShardFailure); + } + /** mark the current recovery as done */ public void markAsDone() { if (finished.compareAndSet(false, true)) { @@ -309,6 +324,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget // free store. increment happens in constructor store.decRef(); indexShard.recoveryStats().decCurrentAsTarget(); + closedLatch.countDown(); } } diff --git a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 4e3c516aa72..6bbccb4cfb7 100644 --- a/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/core/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -31,8 +31,8 @@ import org.elasticsearch.action.admin.indices.stats.CommonStatsFlags; import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.RecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.PeerRecoverySource; import org.elasticsearch.cluster.routing.RecoverySource.SnapshotRecoverySource; @@ -42,6 +42,7 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.Index; import org.elasticsearch.index.recovery.RecoveryStats; import org.elasticsearch.index.store.Store; @@ -55,12 +56,12 @@ import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ESIntegTestCase.Scope; import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.junit.annotations.TestLogging; import org.elasticsearch.test.store.MockFSDirectoryService; import org.elasticsearch.test.store.MockFSIndexStore; import org.elasticsearch.test.transport.MockTransportService; import org.elasticsearch.transport.ConnectTransportException; import org.elasticsearch.transport.Transport; -import org.elasticsearch.transport.TransportException; import org.elasticsearch.transport.TransportRequest; import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; @@ -73,6 +74,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import static org.elasticsearch.node.RecoverySettingsChunkSizePlugin.CHUNK_SIZE_SETTING; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; @@ -653,4 +656,128 @@ public class IndexRecoveryIT extends ESIntegTestCase { super.sendRequest(connection, requestId, action, request, options); } } + + /** + * Tests scenario where recovery target successfully sends recovery request to source but then the channel gets closed while + * the source is working on the recovery process. + */ + @TestLogging("_root:DEBUG,org.elasticsearch.indices.recovery:TRACE") + public void testDisconnectsDuringRecovery() throws Exception { + boolean primaryRelocation = randomBoolean(); + final String indexName = "test"; + final Settings nodeSettings = Settings.builder() + .put(RecoverySettings.INDICES_RECOVERY_RETRY_DELAY_NETWORK_SETTING.getKey(), TimeValue.timeValueMillis(randomIntBetween(0, 100))) + .build(); + TimeValue disconnectAfterDelay = TimeValue.timeValueMillis(randomIntBetween(0, 100)); + // start a master node + String masterNodeName = internalCluster().startMasterOnlyNode(nodeSettings); + + final String blueNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "blue").put(nodeSettings).build()); + final String redNodeName = internalCluster().startNode(Settings.builder().put("node.attr.color", "red").put(nodeSettings).build()); + + client().admin().indices().prepareCreate(indexName) + .setSettings( + Settings.builder() + .put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "blue") + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0) + ).get(); + + List requests = new ArrayList<>(); + int numDocs = scaledRandomIntBetween(25, 250); + for (int i = 0; i < numDocs; i++) { + requests.add(client().prepareIndex(indexName, "type").setSource("{}")); + } + indexRandom(true, requests); + ensureSearchable(indexName); + assertHitCount(client().prepareSearch(indexName).get(), numDocs); + + MockTransportService masterTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, masterNodeName); + MockTransportService blueMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, blueNodeName); + MockTransportService redMockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, redNodeName); + + redMockTransportService.addDelegate(blueMockTransportService, new MockTransportService.DelegateTransport(redMockTransportService.original()) { + private final AtomicInteger count = new AtomicInteger(); + + @Override + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { + logger.info("--> sending request {} on {}", action, connection.getNode()); + if (PeerRecoverySourceService.Actions.START_RECOVERY.equals(action) && count.incrementAndGet() == 1) { + // ensures that it's considered as valid recovery attempt by source + try { + awaitBusy(() -> client(blueNodeName).admin().cluster().prepareState().setLocal(true).get() + .getState().getRoutingTable().index("test").shard(0).getAllInitializingShards().isEmpty() == false); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + super.sendRequest(connection, requestId, action, request, options); + try { + Thread.sleep(disconnectAfterDelay.millis()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + throw new ConnectTransportException(connection.getNode(), "DISCONNECT: simulation disconnect after successfully sending " + action + " request"); + } else { + super.sendRequest(connection, requestId, action, request, options); + } + } + }); + + final AtomicBoolean seenWaitForClusterState = new AtomicBoolean(); + blueMockTransportService.addDelegate(redMockTransportService, new MockTransportService.DelegateTransport(blueMockTransportService.original()) { + @Override + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { + logger.info("--> sending request {} on {}", action, connection.getNode()); + if (action.equals(PeerRecoveryTargetService.Actions.WAIT_CLUSTERSTATE)) { + seenWaitForClusterState.set(true); + } + super.sendRequest(connection, requestId, action, request, options); + } + }); + + for (MockTransportService mockTransportService : Arrays.asList(redMockTransportService, blueMockTransportService)) { + mockTransportService.addDelegate(masterTransportService, new MockTransportService.DelegateTransport(mockTransportService.original()) { + @Override + protected void sendRequest(Connection connection, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException { + logger.info("--> sending request {} on {}", action, connection.getNode()); + if (primaryRelocation == false || seenWaitForClusterState.get() == false) { + assertNotEquals(action, ShardStateAction.SHARD_FAILED_ACTION_NAME); + } + super.sendRequest(connection, requestId, action, request, options); + } + }); + } + + if (primaryRelocation) { + logger.info("--> starting primary relocation recovery from blue to red"); + client().admin().indices().prepareUpdateSettings(indexName).setSettings( + Settings.builder() + .put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "red") + ).get(); + + ensureGreen(); // also waits for relocation / recovery to complete + // if a primary relocation fails after the source shard has been marked as relocated, both source and target are failed. If the + // source shard is moved back to started because the target fails first, it's possible that there is a cluster state where the + // shard is marked as started again (and ensureGreen returns), but while applying the cluster state the primary is failed and + // will be reallocated. The cluster will thus become green, then red, then green again. Triggering a refresh here before + // searching helps, as in contrast to search actions, refresh waits for the closed shard to be reallocated. + client().admin().indices().prepareRefresh(indexName).get(); + } else { + logger.info("--> starting replica recovery from blue to red"); + client().admin().indices().prepareUpdateSettings(indexName).setSettings( + Settings.builder() + .put(IndexMetaData.INDEX_ROUTING_INCLUDE_GROUP_SETTING.getKey() + "color", "red,blue") + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + ).get(); + + ensureGreen(); + } + + for (int i = 0; i < 10; i++) { + assertHitCount(client().prepareSearch(indexName).get(), numDocs); + } + } } diff --git a/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java b/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java index 5b1073beffa..bbc434120d8 100644 --- a/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java +++ b/core/src/test/java/org/elasticsearch/recovery/RecoveriesCollectionTests.java @@ -24,10 +24,12 @@ import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.replication.ESIndexLevelReplicationTestCase; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.Store; import org.elasticsearch.indices.recovery.RecoveriesCollection; import org.elasticsearch.indices.recovery.RecoveryFailedException; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; +import org.elasticsearch.indices.recovery.RecoveryTarget; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -54,10 +56,10 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase { final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {}); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); try (RecoveriesCollection.RecoveryRef status = collection.getRecovery(recoveryId)) { - final long lastSeenTime = status.status().lastAccessTime(); + final long lastSeenTime = status.target().lastAccessTime(); assertBusy(() -> { try (RecoveriesCollection.RecoveryRef currentStatus = collection.getRecovery(recoveryId)) { - assertThat("access time failed to update", lastSeenTime, lessThan(currentStatus.status().lastAccessTime())); + assertThat("access time failed to update", lastSeenTime, lessThan(currentStatus.target().lastAccessTime())); } }); } finally { @@ -100,7 +102,7 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase { final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); final long recoveryId2 = startRecovery(collection, shards.getPrimaryNode(), shards.addReplica()); try (RecoveriesCollection.RecoveryRef recoveryRef = collection.getRecovery(recoveryId)) { - ShardId shardId = recoveryRef.status().shardId(); + ShardId shardId = recoveryRef.target().shardId(); assertTrue("failed to cancel recoveries", collection.cancelRecoveriesForShard(shardId, "test")); assertThat("all recoveries should be cancelled", collection.size(), equalTo(0)); } finally { @@ -118,30 +120,30 @@ public class RecoveriesCollectionTests extends ESIndexLevelReplicationTestCase { final RecoveriesCollection collection = new RecoveriesCollection(logger, threadPool, v -> {}); IndexShard shard = shards.addReplica(); final long recoveryId = startRecovery(collection, shards.getPrimaryNode(), shard); - try (RecoveriesCollection.RecoveryRef recovery = collection.getRecovery(recoveryId)) { - final int currentAsTarget = shard.recoveryStats().currentAsTarget(); - final int referencesToStore = recovery.status().store().refCount(); - String tempFileName = recovery.status().getTempNameForFile("foobar"); - collection.resetRecovery(recoveryId, recovery.status().shardId()); - try (RecoveriesCollection.RecoveryRef resetRecovery = collection.getRecovery(recoveryId)) { - assertNotSame(recovery.status(), resetRecovery); - assertSame(recovery.status().CancellableThreads(), resetRecovery.status().CancellableThreads()); - assertSame(recovery.status().indexShard(), resetRecovery.status().indexShard()); - assertSame(recovery.status().store(), resetRecovery.status().store()); - assertEquals(referencesToStore + 1, resetRecovery.status().store().refCount()); - assertEquals(currentAsTarget+1, shard.recoveryStats().currentAsTarget()); // we blink for a short moment... - recovery.close(); - expectThrows(ElasticsearchException.class, () -> recovery.status().store()); - assertEquals(referencesToStore, resetRecovery.status().store().refCount()); - String resetTempFileName = resetRecovery.status().getTempNameForFile("foobar"); - assertNotEquals(tempFileName, resetTempFileName); - } - assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget()); - } - try (RecoveriesCollection.RecoveryRef resetRecovery = collection.getRecovery(recoveryId)) { + RecoveryTarget recoveryTarget = collection.getRecoveryTarget(recoveryId); + final int currentAsTarget = shard.recoveryStats().currentAsTarget(); + final int referencesToStore = recoveryTarget.store().refCount(); + IndexShard indexShard = recoveryTarget.indexShard(); + Store store = recoveryTarget.store(); + String tempFileName = recoveryTarget.getTempNameForFile("foobar"); + RecoveryTarget resetRecovery = collection.resetRecovery(recoveryId, TimeValue.timeValueMinutes(60)); + final long resetRecoveryId = resetRecovery.recoveryId(); + assertNotSame(recoveryTarget, resetRecovery); + assertNotSame(recoveryTarget.CancellableThreads(), resetRecovery.CancellableThreads()); + assertSame(indexShard, resetRecovery.indexShard()); + assertSame(store, resetRecovery.store()); + assertEquals(referencesToStore, resetRecovery.store().refCount()); + assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget()); + assertEquals(recoveryTarget.refCount(), 0); + expectThrows(ElasticsearchException.class, () -> recoveryTarget.store()); + expectThrows(ElasticsearchException.class, () -> recoveryTarget.indexShard()); + String resetTempFileName = resetRecovery.getTempNameForFile("foobar"); + assertNotEquals(tempFileName, resetTempFileName); + assertEquals(currentAsTarget, shard.recoveryStats().currentAsTarget()); + try (RecoveriesCollection.RecoveryRef newRecoveryRef = collection.getRecovery(resetRecoveryId)) { shards.recoverReplica(shard, (s, n) -> { - assertSame(s, resetRecovery.status().indexShard()); - return resetRecovery.status(); + assertSame(s, newRecoveryRef.target().indexShard()); + return newRecoveryRef.target(); }, false); } shards.assertAllEqual(numDocs);