From 302988745110baa401885556289c0192595404b4 Mon Sep 17 00:00:00 2001 From: Nhat Nguyen Date: Wed, 21 Aug 2019 18:16:58 -0400 Subject: [PATCH] Never release store using CancellableThreads (#45409) Today we can release a Store using CancellableThreads. If we are holding the last reference, then we will verify the node lock before deleting the store. Checking node lock performs some I/O on FileChannel. If the current thread is interrupted, then the channel will be closed and the node lock will also be invalid. Closes #45237 --- .../recovery/PeerRecoverySourceService.java | 5 ++++ .../recovery/RecoverySourceHandler.java | 24 +++++++++++++++++-- .../indices/recovery/IndexRecoveryIT.java | 17 +++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java index a4be25ae4fb..ef47b153f53 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/PeerRecoverySourceService.java @@ -112,6 +112,11 @@ public class PeerRecoverySourceService implements IndexEventListener { } } + // exposed for testing + final int numberOfOngoingRecoveries() { + return ongoingRecoveries.ongoingRecoveries.size(); + } + final class OngoingRecoveries { private final Map ongoingRecoveries = new HashMap<>(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java index 5405929bfb6..65bcc1942ef 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoverySourceHandler.java @@ -33,7 +33,9 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.StepListener; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.ThreadedActionListener; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -232,8 +234,7 @@ public class RecoverySourceHandler { try { final int estimateNumOps = shard.estimateNumberOfHistoryOperations("peer-recovery", startingSeqNo); - shard.store().incRef(); - final Releasable releaseStore = Releasables.releaseOnce(shard.store()::decRef); + final Releasable releaseStore = acquireStore(shard.store()); resources.add(releaseStore); sendFileStep.whenComplete(r -> IOUtils.close(safeCommitRef, releaseStore), e -> { try { @@ -396,6 +397,25 @@ public class RecoverySourceHandler { }); } + /** + * Increases the store reference and returns a {@link Releasable} that will decrease the store reference using the generic thread pool. + * We must never release the store using an interruptible thread as we can risk invalidating the node lock. + */ + private Releasable acquireStore(Store store) { + store.incRef(); + return Releasables.releaseOnce(() -> { + final PlainActionFuture future = new PlainActionFuture<>(); + threadPool.generic().execute(new ActionRunnable(future) { + @Override + protected void doRun() { + store.decRef(); + listener.onResponse(null); + } + }); + FutureUtils.get(future); + }); + } + static final class SendFileResult { final List phase1FileNames; final List phase1FileSizes; diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index f169263a6cd..ae7d6f07183 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.indices.stats.ShardStats; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -1488,4 +1489,20 @@ public class IndexRecoveryIT extends ESIntegTestCase { } ensureGreen(indexName); } + + public void testCancelRecoveryWithAutoExpandReplicas() throws Exception { + internalCluster().startMasterOnlyNode(); + assertAcked(client().admin().indices().prepareCreate("test") + .setSettings(Settings.builder().put(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS, "0-all")) + .setWaitForActiveShards(ActiveShardCount.NONE)); + internalCluster().startNode(); + internalCluster().startNode(); + client().admin().cluster().prepareReroute().setRetryFailed(true).get(); + assertAcked(client().admin().indices().prepareDelete("test")); // cancel recoveries + assertBusy(() -> { + for (PeerRecoverySourceService recoveryService : internalCluster().getDataNodeInstances(PeerRecoverySourceService.class)) { + assertThat(recoveryService.numberOfOngoingRecoveries(), equalTo(0)); + } + }); + } }