diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index 2ed79a8a996..3bee3febf3f 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.Callback; +import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.shard.IndexShard; @@ -106,9 +107,18 @@ public class RecoveriesCollection { } // Closes the current recovery target - final RecoveryTarget finalOldRecoveryTarget = oldRecoveryTarget; final AtomicBoolean successfulReset = new AtomicBoolean(); - newRecoveryTarget.CancellableThreads().executeIO(() -> successfulReset.set(finalOldRecoveryTarget.resetRecovery())); + try { + final RecoveryTarget finalOldRecoveryTarget = oldRecoveryTarget; + newRecoveryTarget.CancellableThreads().executeIO(() -> successfulReset.set(finalOldRecoveryTarget.resetRecovery())); + } catch (CancellableThreads.ExecutionCancelledException e) { + // new recovery target is already cancelled (probably due to shard closing or recovery source changing) + assert onGoingRecoveries.containsKey(newRecoveryTarget.recoveryId()) == false; + logger.trace("{} recovery reset cancelled, recovery from {}, id [{}], previous id [{}]", newRecoveryTarget.shardId(), + newRecoveryTarget.sourceNode(), newRecoveryTarget.recoveryId(), oldRecoveryTarget.recoveryId()); + oldRecoveryTarget.cancel("recovery reset cancelled"); // if finalOldRecoveryTarget.resetRecovery did not even get to execute + return null; + } if (successfulReset.get() == false) { cancelRecovery(newRecoveryTarget.recoveryId(), "failed to reset recovery"); return null; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 981a5f3ed8f..4311d3b2ab1 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -184,9 +184,14 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget */ boolean resetRecovery() throws InterruptedException, IOException { if (finished.compareAndSet(false, true)) { - logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId); - // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now - decRef(); + try { + // yes, this is just a logger call in a try-finally block. The reason for this is that resetRecovery is called from + // CancellableThreads and we have to make sure that all references to IndexShard are cleaned up before exiting this method + logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId); + } finally { + // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now. + decRef(); + } closedLatch.await(); RecoveryState.Stage stage = indexShard.recoveryState().getStage(); if (indexShard.recoveryState().getPrimary() && (stage == RecoveryState.Stage.FINALIZE || stage == RecoveryState.Stage.DONE)) {