From 816e1c6cc4db70727e4f006646330dea7abb92cb Mon Sep 17 00:00:00 2001 From: Yannick Welsch Date: Thu, 29 Dec 2016 16:51:37 +0100 Subject: [PATCH] Free shard resources when recovery reset is cancelled Resetting a recovery consists of resetting the old recovery target and replacing it by a new recovery target object. This is done on the Cancellable threads of the new recovery target. If the new recovery target is already cancelled before or while this happens, for example due to shard closing or recovery source changing, we have to make sure that the old recovery target object frees all shard resources. Relates to #22325 --- .../indices/recovery/RecoveriesCollection.java | 14 ++++++++++++-- .../indices/recovery/RecoveryTarget.java | 11 ++++++++--- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index 2ed79a8a996..3bee3febf3f 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -26,6 +26,7 @@ import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.Callback; +import org.elasticsearch.common.util.CancellableThreads; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.index.shard.IndexShard; @@ -106,9 +107,18 @@ public class RecoveriesCollection { } // Closes the current recovery target - final RecoveryTarget finalOldRecoveryTarget = oldRecoveryTarget; final AtomicBoolean successfulReset = new AtomicBoolean(); - newRecoveryTarget.CancellableThreads().executeIO(() -> successfulReset.set(finalOldRecoveryTarget.resetRecovery())); + try { + final RecoveryTarget finalOldRecoveryTarget = oldRecoveryTarget; + newRecoveryTarget.CancellableThreads().executeIO(() -> successfulReset.set(finalOldRecoveryTarget.resetRecovery())); + } catch (CancellableThreads.ExecutionCancelledException e) { + // new recovery target is already cancelled (probably due to shard closing or recovery source changing) + assert onGoingRecoveries.containsKey(newRecoveryTarget.recoveryId()) == false; + logger.trace("{} recovery reset cancelled, recovery from {}, id [{}], previous id [{}]", newRecoveryTarget.shardId(), + newRecoveryTarget.sourceNode(), newRecoveryTarget.recoveryId(), oldRecoveryTarget.recoveryId()); + oldRecoveryTarget.cancel("recovery reset cancelled"); // if finalOldRecoveryTarget.resetRecovery did not even get to execute + return null; + } if (successfulReset.get() == false) { cancelRecovery(newRecoveryTarget.recoveryId(), "failed to reset recovery"); return null; diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 981a5f3ed8f..4311d3b2ab1 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -184,9 +184,14 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget */ boolean resetRecovery() throws InterruptedException, IOException { if (finished.compareAndSet(false, true)) { - logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId); - // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now - decRef(); + try { + // yes, this is just a logger call in a try-finally block. The reason for this is that resetRecovery is called from + // CancellableThreads and we have to make sure that all references to IndexShard are cleaned up before exiting this method + logger.debug("reset of recovery with shard {} and id [{}]", shardId, recoveryId); + } finally { + // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now. + decRef(); + } closedLatch.await(); RecoveryState.Stage stage = indexShard.recoveryState().getStage(); if (indexShard.recoveryState().getPrimary() && (stage == RecoveryState.Stage.FINALIZE || stage == RecoveryState.Stage.DONE)) {