Ensure MockRepository is Unblocked on Node Close (#62711) (#62748)

`RepositoriesService#doClose` was never called which lead to
mock repositories not unblocking until the `ThreadPool` interrupts
all threads. Thus stopping a node that is blocked on a mock repository operation wastes `10s`
in each test that does it (which is quite a few as it turns out).
This commit is contained in:
Armin Braun 2020-09-22 11:00:18 +02:00 committed by GitHub
parent 4bdbc39e9f
commit aa0dc56412
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 16 additions and 7 deletions

View File

@ -42,9 +42,12 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
public void testMasterFailoverDuringCleanup() throws Exception {
startBlockedCleanup("test-repo");
final int nodeCount = internalCluster().numDataAndMasterNodes();
logger.info("--> stopping master node");
internalCluster().stopCurrentMasterNode();
ensureStableCluster(nodeCount - 1);
logger.info("--> wait for cleanup to finish and disappear from cluster state");
assertBusy(() -> {
RepositoryCleanupInProgress cleanupInProgress =
@ -102,6 +105,8 @@ public class BlobStoreRepositoryCleanupIT extends AbstractSnapshotIntegTestCase
logger.info("--> waiting for block to kick in on " + masterNode);
waitForBlock(masterNode, repoName, TimeValue.timeValueSeconds(60));
awaitClusterState(state ->
state.custom(RepositoryCleanupInProgress.TYPE, RepositoryCleanupInProgress.EMPTY).hasCleanupInProgress());
return masterNode;
}

View File

@ -61,7 +61,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
@ -1283,10 +1282,6 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
.setPartial(partial).execute();
}
private void awaitClusterState(Predicate<ClusterState> statePredicate) throws Exception {
awaitClusterState(internalCluster().getMasterName(), statePredicate);
}
// Large snapshot pool settings to set up nodes for tests involving multiple repositories that need to have enough
// threads so that blocking some threads on one repository doesn't block other repositories from doing work
private static final Settings LARGE_SNAPSHOT_POOL_SETTINGS = Settings.builder()

View File

@ -947,6 +947,7 @@ public class Node implements Closeable {
toClose.add(() -> stopWatch.stop().start("snapshot_service"));
toClose.add(injector.getInstance(SnapshotsService.class));
toClose.add(injector.getInstance(SnapshotShardsService.class));
toClose.add(injector.getInstance(RepositoriesService.class));
toClose.add(() -> stopWatch.stop().start("client"));
Releasables.close(injector.getInstance(Client.class));
toClose.add(() -> stopWatch.stop().start("indices_cluster"));

View File

@ -440,6 +440,10 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false);
}
protected void awaitClusterState(Predicate<ClusterState> statePredicate) throws Exception {
awaitClusterState(internalCluster().getMasterName(), statePredicate);
}
protected void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode);
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode);

View File

@ -322,9 +322,13 @@ public class MockRepository extends FsRepository {
}
}
private void blockExecutionAndMaybeWait(final String blobName) {
private void blockExecutionAndMaybeWait(final String blobName) throws IOException {
logger.info("[{}] blocking I/O operation for file [{}] at path [{}]", metadata.name(), blobName, path());
if (blockExecution() && waitAfterUnblock > 0) {
final boolean wasBlocked = blockExecution();
if (wasBlocked && lifecycle.stoppedOrClosed()) {
throw new IOException("already closed");
}
if (wasBlocked && waitAfterUnblock > 0) {
try {
// Delay operation after unblocking
// So, we can start node shutdown while this operation is still running.