From f79fb4ada7995c11874346787db276b6edafaa8f Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 19 Jul 2016 11:54:52 +0200 Subject: [PATCH] Create RecoveryTarget once we reset the source RecoveryTarget increments a reference on the store once it's created. If we fail to return the instance from the reset method we leak a reference causing shard locks to not be released. This change creates the reference in the return statement to ensure no references are leaked --- .../indices/recovery/RecoveriesCollection.java | 2 +- .../elasticsearch/indices/recovery/RecoveryTarget.java | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java index c7aa8287875..e1a197533e6 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveriesCollection.java @@ -85,7 +85,7 @@ public class RecoveriesCollection { RecoveryTarget status = ref.status(); RecoveryTarget resetRecovery = status.resetRecovery(); if (onGoingRecoveries.replace(id, status, resetRecovery) == false) { - resetRecovery.cancel("replace failed"); + resetRecovery.cancel("replace failed"); // this is important otherwise we leak a reference to the store throw new IllegalStateException("failed to replace recovery target"); } } diff --git a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index fe8c7abdb82..683b0a87eac 100644 --- a/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/core/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -87,7 +87,7 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget private final Map tempFileNames = ConcurrentCollections.newConcurrentMap(); private RecoveryTarget(RecoveryTarget copyFrom) { // copy constructor - this(copyFrom.indexShard(), copyFrom.sourceNode(), copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId()); + this(copyFrom.indexShard, copyFrom.sourceNode, copyFrom.listener, copyFrom.cancellableThreads, copyFrom.recoveryId); } public RecoveryTarget(IndexShard indexShard, DiscoveryNode sourceNode, RecoveryTargetService.RecoveryListener listener) { @@ -163,17 +163,18 @@ public class RecoveryTarget extends AbstractRefCounted implements RecoveryTarget /** * Closes the current recovery target and returns a - * clone to reset the ongoing recovery + * clone to reset the ongoing recovery. + * Note: the returned target must be canceled, failed or finished + * in order to release all it's reference. */ RecoveryTarget resetRecovery() throws IOException { ensureRefCount(); - RecoveryTarget copy = new RecoveryTarget(this); if (finished.compareAndSet(false, true)) { // release the initial reference. recovery files will be cleaned as soon as ref count goes to zero, potentially now decRef(); } indexShard.performRecoveryRestart(); - return copy; + return new RecoveryTarget(this); } /**