diff --git a/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java b/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java index d57aff18d05..6dc7f51a070 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/discovery/SnapshotDisruptionIT.java @@ -24,9 +24,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.ClusterChangedEvent; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterStateListener; -import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.RepositoriesMetadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -137,7 +135,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase { logger.info("--> waiting for disruption to start"); assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES)); - assertAllSnapshotsCompleted(); + awaitNoMoreRunningOperations(dataNode); logger.info("--> verify that snapshot was successful or no longer exist"); assertBusy(() -> { @@ -154,13 +152,13 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase { logger.info("--> done"); future.get(); - assertAllSnapshotsCompleted(); + awaitNoMoreRunningOperations(masterNode1); } public void testDisruptionAfterFinalization() throws Exception { final String idxName = "test"; internalCluster().startMasterOnlyNodes(3); - internalCluster().startDataOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); ensureStableCluster(4); createRandomIndex(idxName); @@ -205,7 +203,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase { logger.info("--> waiting for disruption to start"); assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES)); - assertAllSnapshotsCompleted(); + awaitNoMoreRunningOperations(dataNode); logger.info("--> verify that snapshot was successful or no longer exist"); assertBusy(() -> { @@ -233,7 +231,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase { assertThat(sne.getSnapshotName(), is(snapshot)); } - assertAllSnapshotsCompleted(); + awaitNoMoreRunningOperations(dataNode); } public void testDisruptionAfterShardFinalization() throws Exception { @@ -322,7 +320,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase { unblockNode(repoName, dataNode); networkDisruption.stopDisrupting(); - assertAllSnapshotsCompleted(); + awaitNoMoreRunningOperations(dataNode); logger.info("--> make sure isolated master responds to snapshot request"); final SnapshotException sne = @@ -330,25 +328,6 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase { assertThat(sne.getMessage(), endsWith("no longer master")); } - private void assertAllSnapshotsCompleted() throws Exception { - logger.info("--> wait until the snapshot is done"); - assertBusy(() -> { - ClusterState state = dataNodeClient().admin().cluster().prepareState().get().getState(); - SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY); - SnapshotDeletionsInProgress snapshotDeletionsInProgress = - state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY); - if (snapshots.entries().isEmpty() == false) { - logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state()); - fail("Snapshot is still running"); - } else if (snapshotDeletionsInProgress.hasDeletionsInProgress()) { - logger.info("Current snapshot deletion state [{}]", snapshotDeletionsInProgress); - fail("Snapshot deletion is still running"); - } else { - logger.info("Snapshot is no longer in the cluster state"); - } - }, 1L, TimeUnit.MINUTES); - } - private void assertSnapshotExists(String repository, String snapshot) { GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots(repository) .setSnapshots(snapshot).get(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java similarity index 89% rename from server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java rename to test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 96ca9a6e64f..781edb53eb5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -24,9 +24,13 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateObserver; +import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; @@ -37,6 +41,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.json.JsonXContent; +import org.elasticsearch.node.NodeClosedException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; @@ -48,6 +53,7 @@ import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.test.VersionUtils; +import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import java.io.IOException; @@ -62,7 +68,9 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.empty; @@ -373,4 +381,36 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { protected void assertDocCount(String index, long count) { assertEquals(getCountForIndex(index), count); } + + protected void awaitNoMoreRunningOperations(String viaNode) throws Exception { + logger.info("--> verify no more operations in the cluster state"); + awaitClusterState(viaNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty() && + state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false); + } + + private void awaitClusterState(String viaNode, Predicate statePredicate) throws Exception { + final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode); + final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode); + final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext()); + if (statePredicate.test(observer.setAndGetObservedState()) == false) { + final PlainActionFuture future = PlainActionFuture.newFuture(); + observer.waitForNextChange(new ClusterStateObserver.Listener() { + @Override + public void onNewClusterState(ClusterState state) { + future.onResponse(null); + } + + @Override + public void onClusterServiceClose() { + future.onFailure(new NodeClosedException(clusterService.localNode())); + } + + @Override + public void onTimeout(TimeValue timeout) { + future.onFailure(new TimeoutException()); + } + }, statePredicate); + future.get(30L, TimeUnit.SECONDS); + } + } } diff --git a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java index e9b0f3c3a08..c5f0c3abe58 100644 --- a/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java +++ b/x-pack/plugin/ilm/src/test/java/org/elasticsearch/xpack/slm/SLMSnapshotBlockingIntegTests.java @@ -13,9 +13,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; -import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.Strings; @@ -23,8 +21,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.plugins.Plugin; -import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase; import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotMissingException; @@ -54,7 +52,6 @@ import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.empty; @@ -65,7 +62,7 @@ import static org.hamcrest.Matchers.greaterThan; * Tests for Snapshot Lifecycle Management that require a slow or blocked snapshot repo (using {@link MockRepository} */ @ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0) -public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { +public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase { private static final String NEVER_EXECUTE_CRON_SCHEDULE = "* * * 31 FEB ? *"; private static final String REPO = "repo-id"; @@ -117,9 +114,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { for (int i = 0; i < docCount; i++) { index(indexName, "_doc", i + "", Collections.singletonMap("foo", "bar")); } - - // Create a snapshot repo - initializeRepo(REPO); + createRepository(REPO, "mock"); logger.info("--> creating policy {}", policyName); createSnapshotPolicy(policyName, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true); @@ -150,7 +145,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { logger.info("--> unblocking snapshots"); unblockAllDataNodes(REPO); - unblockRepo(REPO); + unblockNode(REPO, internalCluster().getMasterName()); // Cancel/delete the snapshot try { @@ -167,8 +162,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { for (int i = 0; i < docCount; i++) { index(indexName, "_doc", null, Collections.singletonMap("foo", "bar")); } - - initializeRepo(REPO); + createRepository(REPO, "mock"); logger.info("--> creating policy {}", policyId); createSnapshotPolicy(policyId, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true, @@ -190,12 +184,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { } }); - // Wait for all running snapshots to be cleared from cluster state - assertBusy(() -> { - logger.info("--> waiting for cluster state to be clear of snapshots"); - ClusterState state = client().admin().cluster().prepareState().setCustoms(true).get().getState(); - assertTrue("cluster state was not ready for deletion " + state, SnapshotRetentionTask.okayToDeleteSnapshots(state)); - }); + awaitNoMoreRunningOperations(randomFrom(dataNodeNames)); logger.info("--> indexing more docs to force new segment files"); for (int i = 0; i < docCount; i++) { @@ -211,11 +200,10 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { final String secondSnapName = executePolicy(policyId); logger.info("--> executed policy, got snapname [{}]", secondSnapName); - // Check that the executed snapshot shows up in the SLM output as in_progress + logger.info("--> Waiting for at least one data node to hit the block"); + waitForBlockOnAnyDataNode(REPO, TimeValue.timeValueSeconds(30L)); assertBusy(() -> { - logger.info("--> Waiting for at least one data node to hit the block"); - assertTrue(dataNodeNames.stream().anyMatch(node -> checkBlocked(node, REPO))); logger.info("--> at least one data node has hit the block"); GetSnapshotLifecycleAction.Response getResp = client().execute(GetSnapshotLifecycleAction.INSTANCE, new GetSnapshotLifecycleAction.Request(policyId)).get(); @@ -238,7 +226,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { new ExecuteSnapshotRetentionAction.Request()).get().isAcknowledged()); logger.info("--> unblocking snapshots"); - unblockRepo(REPO); + unblockNode(REPO, internalCluster().getMasterName()); unblockAllDataNodes(REPO); // Check that the snapshot created by the policy has been removed by retention @@ -275,7 +263,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { assertThat(resp.getHits().getTotalHits().value, equalTo(2L)); }); } finally { - unblockRepo(REPO); + unblockNode(REPO, internalCluster().getMasterName()); unblockAllDataNodes(REPO); } } @@ -295,9 +283,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { final SnapshotState expectedUnsuccessfulState = partialSuccess ? SnapshotState.PARTIAL : SnapshotState.FAILED; // Setup createAndPopulateIndex(indexName); - - // Create a snapshot repo - initializeRepo(REPO); + createRepository(REPO, "mock"); createSnapshotPolicy(policyId, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true, partialSuccess, new SnapshotRetentionConfiguration(null, 1, 2)); @@ -355,7 +341,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { createAndPopulateIndex(indexName); logger.info("--> unblocking snapshots"); - unblockRepo(REPO); + unblockNode(REPO, internalCluster().getMasterName()); unblockAllDataNodes(REPO); logger.info("--> taking new snapshot"); @@ -405,18 +391,14 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { assertEquals(SnapshotState.SUCCESS, snapshotInfo.state()); }); } + awaitNoMoreRunningOperations(internalCluster().getMasterName()); } public void testSLMRetentionAfterRestore() throws Exception { final String indexName = "test"; final String policyName = "test-policy"; - int docCount = 20; - for (int i = 0; i < docCount; i++) { - index(indexName, "_doc", i + "", Collections.singletonMap("foo", "bar")); - } - - // Create a snapshot repo - initializeRepo(REPO); + indexRandomDocs(indexName, 20); + createRepository(REPO, "mock"); logger.info("--> creating policy {}", policyName); createSnapshotPolicy(policyName, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true, false, @@ -475,27 +457,9 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { private void createAndPopulateIndex(String indexName) throws InterruptedException { logger.info("--> creating and populating index [{}]", indexName); - assertAcked(prepareCreate(indexName, 0, Settings.builder() - .put("number_of_shards", 6).put("number_of_replicas", 0))); + assertAcked(prepareCreate(indexName, 0, indexSettingsNoReplicas(6))); ensureGreen(); - - final int numdocs = randomIntBetween(50, 100); - IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs]; - for (int i = 0; i < builders.length; i++) { - builders[i] = client().prepareIndex(indexName, SINGLE_MAPPING_NAME, Integer.toString(i)).setSource("field1", "bar " + i); - } - indexRandom(true, builders); - flushAndRefresh(); - } - - private void initializeRepo(String repoName) { - client().admin().cluster().preparePutRepository(repoName) - .setType("mock") - .setSettings(Settings.builder() - .put("compress", randomBoolean()) - .put("location", randomAlphaOfLength(6)) - .build()) - .get(); + indexRandomDocs(indexName, randomIntBetween(50, 100)); } private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String REPO, @@ -546,49 +510,4 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase { return "bad"; } } - - public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) { - final String masterName = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName) - .repository(repositoryName)).setBlockOnWriteIndexFile(true); - return masterName; - } - - public static String unblockRepo(final String repositoryName) { - final String masterName = internalCluster().getMasterName(); - ((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName) - .repository(repositoryName)).unblock(); - return masterName; - } - - public static void blockAllDataNodes(String repository) { - for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { - ((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true); - } - } - - public static void unblockAllDataNodes(String repository) { - for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) { - ((MockRepository)repositoriesService.repository(repository)).unblock(); - } - } - - public void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException { - long start = System.currentTimeMillis(); - RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node); - MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository); - while (System.currentTimeMillis() - start < timeout.millis()) { - if (mockRepository.blocked()) { - return; - } - Thread.sleep(100); - } - fail("Timeout waiting for node [" + node + "] to be blocked"); - } - - public boolean checkBlocked(String node, String repository) { - RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node); - MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository); - return mockRepository.blocked(); - } }