From aa0dc5641272f0e46e4b63fc850f77f1451c4ee5 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 22 Sep 2020 11:00:18 +0200 Subject: [PATCH] Ensure MockRepository is Unblocked on Node Close (#62711) (#62748) `RepositoriesService#doClose` was never called which lead to mock repositories not unblocking until the `ThreadPool` interrupts all threads. Thus stopping a node that is blocked on a mock repository operation wastes `10s` in each test that does it (which is quite a few as it turns out). --- .../blobstore/BlobStoreRepositoryCleanupIT.java | 5 +++++ .../elasticsearch/snapshots/ConcurrentSnapshotsIT.java | 5 ----- server/src/main/java/org/elasticsearch/node/Node.java | 1 + .../snapshots/AbstractSnapshotIntegTestCase.java | 4 ++++ .../elasticsearch/snapshots/mockstore/MockRepository.java | 8 ++++++-- 5 files changed, 16 insertions(+), 7 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java index fc4ac0d0d63..2a660709f48 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryCleanupIT.java @@ -42,9 +42,12 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase public void testMasterFailoverDuringCleanup() throws Exception { startBlockedCleanup("test-repo"); + final int nodeCount = internalCluster().numDataAndMasterNodes(); logger.info("--> stopping master node"); internalCluster().stopCurrentMasterNode(); + ensureStableCluster(nodeCount - 1); + logger.info("--> wait for cleanup to finish and disappear from cluster state"); assertBusy(() -> { RepositoryCleanupInProgress cleanupInProgress = @@ -102,6 +105,8 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase logger.info("--> waiting for block to kick in on " + masterNode); waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60)); + awaitClusterState(state -> + state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress()); return masterNode; } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index f0e8f4b4b1d..d13a8419321 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -61,7 +61,6 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.concurrent.TimeUnit; -import java.util.function.Predicate; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS; import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS; @@ -1283,10 +1282,6 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { .setPartial(partial).execute(); } - private void awaitClusterState(Predicate statePredicate) throws Exception { - awaitClusterState(internalCluster().getMasterName(), statePredicate); - } - // Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough // threads so that blocking some threads on one repository doesn't block other repositories from doing work private static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder() diff --git a/server/src/main/java/org/elasticsearch/node/Node.java b/server/src/main/java/org/elasticsearch/node/Node.java index 5e1208b6913..5a5642d65c4 100644 --- a/server/src/main/java/org/elasticsearch/node/Node.java +++ b/server/src/main/java/org/elasticsearch/node/Node.java @@ -947,6 +947,7 @@ public class Node implements Closeable { toClose.add(() -> stopWatch.stop().start("snapshot_service")); toClose.add(injector.getInstance(SnapshotsService.class)); toClose.add(injector.getInstance(SnapshotShardsService.class)); + toClose.add(injector.getInstance(RepositoriesService.class)); toClose.add(() -> stopWatch.stop().start("client")); Releasables.close(injector.getInstance(Client.class)); toClose.add(() -> stopWatch.stop().start("indices_cluster")); diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 2abb0d5b69b..5236780a575 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -440,6 +440,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false); } + protected void awaitClusterState(Predicate statePredicate) throws Exception { + awaitClusterState(internalCluster().getMasterName(), statePredicate); + } + protected void awaitClusterState(String viaNode, Predicate statePredicate) throws Exception { final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode); final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode); diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 95355a58508..a91e95c836c 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -322,9 +322,13 @@ public class MockRepository extends FsRepository { } } - private void blockExecutionAndMaybeWait(final String blobName) { + private void blockExecutionAndMaybeWait(final String blobName) throws IOException { logger.info("[{}] blocking I/O operation for file [{}] at path [{}]", metadata.name(), blobName, path()); - if (blockExecution() && waitAfterUnblock > 0) { + final boolean wasBlocked = blockExecution(); + if (wasBlocked && lifecycle.stoppedOrClosed()) { + throw new IOException("already closed"); + } + if (wasBlocked && waitAfterUnblock > 0) { try { // Delay operation after unblocking // So, we can start node shutdown while this operation is still running.