Share IT Infrastructure between Core Snapshot and SLM ITs (#59082) (#59119)

For #58994 it would be useful to be able to share test infrastructure.
This PR shares `AbstractSnapshotIntegTestCase` for that purpose, dries up SLM tests
accordingly and adds a shared and efficient (compared to the previous implementations)
way of waiting for no running snapshot operations to the test infrastructure to dry things up further.
This commit is contained in:
Armin Braun 2020-07-07 12:04:41 +02:00 committed by GitHub
parent e217f9a1e8
commit d6d6df16bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 63 additions and 125 deletions

View File

@ -24,9 +24,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.RepositoriesMetadata;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
@ -137,7 +135,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
logger.info("--> waiting for disruption to start");
assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));
assertAllSnapshotsCompleted();
awaitNoMoreRunningOperations(dataNode);
logger.info("--> verify that snapshot was successful or no longer exist");
assertBusy(() -> {
@ -154,13 +152,13 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
logger.info("--> done");
future.get();
assertAllSnapshotsCompleted();
awaitNoMoreRunningOperations(masterNode1);
}
public void testDisruptionAfterFinalization() throws Exception {
final String idxName = "test";
internalCluster().startMasterOnlyNodes(3);
internalCluster().startDataOnlyNode();
final String dataNode = internalCluster().startDataOnlyNode();
ensureStableCluster(4);
createRandomIndex(idxName);
@ -205,7 +203,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
logger.info("--> waiting for disruption to start");
assertTrue(disruptionStarted.await(1, TimeUnit.MINUTES));
assertAllSnapshotsCompleted();
awaitNoMoreRunningOperations(dataNode);
logger.info("--> verify that snapshot was successful or no longer exist");
assertBusy(() -> {
@ -233,7 +231,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
assertThat(sne.getSnapshotName(), is(snapshot));
}
assertAllSnapshotsCompleted();
awaitNoMoreRunningOperations(dataNode);
}
public void testDisruptionAfterShardFinalization() throws Exception {
@ -322,7 +320,7 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
unblockNode(repoName, dataNode);
networkDisruption.stopDisrupting();
assertAllSnapshotsCompleted();
awaitNoMoreRunningOperations(dataNode);
logger.info("--> make sure isolated master responds to snapshot request");
final SnapshotException sne =
@ -330,25 +328,6 @@ public class SnapshotDisruptionIT extends AbstractSnapshotIntegTestCase {
assertThat(sne.getMessage(), endsWith("no longer master"));
}
private void assertAllSnapshotsCompleted() throws Exception {
logger.info("--> wait until the snapshot is done");
assertBusy(() -> {
ClusterState state = dataNodeClient().admin().cluster().prepareState().get().getState();
SnapshotsInProgress snapshots = state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY);
SnapshotDeletionsInProgress snapshotDeletionsInProgress =
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY);
if (snapshots.entries().isEmpty() == false) {
logger.info("Current snapshot state [{}]", snapshots.entries().get(0).state());
fail("Snapshot is still running");
} else if (snapshotDeletionsInProgress.hasDeletionsInProgress()) {
logger.info("Current snapshot deletion state [{}]", snapshotDeletionsInProgress);
fail("Snapshot deletion is still running");
} else {
logger.info("Snapshot is no longer in the cluster state");
}
}, 1L, TimeUnit.MINUTES);
}
private void assertSnapshotExists(String repository, String snapshot) {
GetSnapshotsResponse snapshotsStatusResponse = dataNodeClient().admin().cluster().prepareGetSnapshots(repository)
.setSnapshots(snapshot).get();

View File

@ -24,9 +24,13 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
@ -37,6 +41,7 @@ import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
@ -48,6 +53,7 @@ import org.elasticsearch.snapshots.mockstore.MockRepository;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import java.io.IOException;
@ -62,7 +68,9 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.empty;
@ -373,4 +381,36 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
protected void assertDocCount(String index, long count) {
assertEquals(getCountForIndex(index), count);
}
protected void awaitNoMoreRunningOperations(String viaNode) throws Exception {
logger.info("--> verify no more operations in the cluster state");
awaitClusterState(viaNode, state -> state.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries().isEmpty() &&
state.custom(SnapshotDeletionsInProgress.TYPE, SnapshotDeletionsInProgress.EMPTY).hasDeletionsInProgress() == false);
}
private void awaitClusterState(String viaNode, Predicate<ClusterState> statePredicate) throws Exception {
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, viaNode);
final ThreadPool threadPool = internalCluster().getInstance(ThreadPool.class, viaNode);
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, logger, threadPool.getThreadContext());
if (statePredicate.test(observer.setAndGetObservedState()) == false) {
final PlainActionFuture<Void> future = PlainActionFuture.newFuture();
observer.waitForNextChange(new ClusterStateObserver.Listener() {
@Override
public void onNewClusterState(ClusterState state) {
future.onResponse(null);
}
@Override
public void onClusterServiceClose() {
future.onFailure(new NodeClosedException(clusterService.localNode()));
}
@Override
public void onTimeout(TimeValue timeout) {
future.onFailure(new TimeoutException());
}
}, statePredicate);
future.get(30L, TimeUnit.SECONDS);
}
}
}

View File

@ -13,9 +13,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus;
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Strings;
@ -23,8 +21,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.snapshots.AbstractSnapshotIntegTestCase;
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException;
import org.elasticsearch.snapshots.SnapshotInfo;
import org.elasticsearch.snapshots.SnapshotMissingException;
@ -54,7 +52,6 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static org.elasticsearch.index.mapper.MapperService.SINGLE_MAPPING_NAME;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.empty;
@ -65,7 +62,7 @@ import static org.hamcrest.Matchers.greaterThan;
* Tests for Snapshot Lifecycle Management that require a slow or blocked snapshot repo (using {@link MockRepository}
*/
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
public class SLMSnapshotBlockingIntegTests extends AbstractSnapshotIntegTestCase {
private static final String NEVER_EXECUTE_CRON_SCHEDULE = "* * * 31 FEB ? *";
private static final String REPO = "repo-id";
@ -117,9 +114,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
for (int i = 0; i < docCount; i++) {
index(indexName, "_doc", i + "", Collections.singletonMap("foo", "bar"));
}
// Create a snapshot repo
initializeRepo(REPO);
createRepository(REPO, "mock");
logger.info("--> creating policy {}", policyName);
createSnapshotPolicy(policyName, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true);
@ -150,7 +145,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
logger.info("--> unblocking snapshots");
unblockAllDataNodes(REPO);
unblockRepo(REPO);
unblockNode(REPO, internalCluster().getMasterName());
// Cancel/delete the snapshot
try {
@ -167,8 +162,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
for (int i = 0; i < docCount; i++) {
index(indexName, "_doc", null, Collections.singletonMap("foo", "bar"));
}
initializeRepo(REPO);
createRepository(REPO, "mock");
logger.info("--> creating policy {}", policyId);
createSnapshotPolicy(policyId, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true,
@ -190,12 +184,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
}
});
// Wait for all running snapshots to be cleared from cluster state
assertBusy(() -> {
logger.info("--> waiting for cluster state to be clear of snapshots");
ClusterState state = client().admin().cluster().prepareState().setCustoms(true).get().getState();
assertTrue("cluster state was not ready for deletion " + state, SnapshotRetentionTask.okayToDeleteSnapshots(state));
});
awaitNoMoreRunningOperations(randomFrom(dataNodeNames));
logger.info("--> indexing more docs to force new segment files");
for (int i = 0; i < docCount; i++) {
@ -211,11 +200,10 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
final String secondSnapName = executePolicy(policyId);
logger.info("--> executed policy, got snapname [{}]", secondSnapName);
// Check that the executed snapshot shows up in the SLM output as in_progress
assertBusy(() -> {
logger.info("--> Waiting for at least one data node to hit the block");
assertTrue(dataNodeNames.stream().anyMatch(node -> checkBlocked(node, REPO)));
waitForBlockOnAnyDataNode(REPO, TimeValue.timeValueSeconds(30L));
assertBusy(() -> {
logger.info("--> at least one data node has hit the block");
GetSnapshotLifecycleAction.Response getResp =
client().execute(GetSnapshotLifecycleAction.INSTANCE, new GetSnapshotLifecycleAction.Request(policyId)).get();
@ -238,7 +226,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
new ExecuteSnapshotRetentionAction.Request()).get().isAcknowledged());
logger.info("--> unblocking snapshots");
unblockRepo(REPO);
unblockNode(REPO, internalCluster().getMasterName());
unblockAllDataNodes(REPO);
// Check that the snapshot created by the policy has been removed by retention
@ -275,7 +263,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
assertThat(resp.getHits().getTotalHits().value, equalTo(2L));
});
} finally {
unblockRepo(REPO);
unblockNode(REPO, internalCluster().getMasterName());
unblockAllDataNodes(REPO);
}
}
@ -295,9 +283,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
final SnapshotState expectedUnsuccessfulState = partialSuccess ? SnapshotState.PARTIAL : SnapshotState.FAILED;
// Setup
createAndPopulateIndex(indexName);
// Create a snapshot repo
initializeRepo(REPO);
createRepository(REPO, "mock");
createSnapshotPolicy(policyId, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true,
partialSuccess, new SnapshotRetentionConfiguration(null, 1, 2));
@ -355,7 +341,7 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
createAndPopulateIndex(indexName);
logger.info("--> unblocking snapshots");
unblockRepo(REPO);
unblockNode(REPO, internalCluster().getMasterName());
unblockAllDataNodes(REPO);
logger.info("--> taking new snapshot");
@ -405,18 +391,14 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
});
}
awaitNoMoreRunningOperations(internalCluster().getMasterName());
}
public void testSLMRetentionAfterRestore() throws Exception {
final String indexName = "test";
final String policyName = "test-policy";
int docCount = 20;
for (int i = 0; i < docCount; i++) {
index(indexName, "_doc", i + "", Collections.singletonMap("foo", "bar"));
}
// Create a snapshot repo
initializeRepo(REPO);
indexRandomDocs(indexName, 20);
createRepository(REPO, "mock");
logger.info("--> creating policy {}", policyName);
createSnapshotPolicy(policyName, "snap", NEVER_EXECUTE_CRON_SCHEDULE, REPO, indexName, true, false,
@ -475,27 +457,9 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
private void createAndPopulateIndex(String indexName) throws InterruptedException {
logger.info("--> creating and populating index [{}]", indexName);
assertAcked(prepareCreate(indexName, 0, Settings.builder()
.put("number_of_shards", 6).put("number_of_replicas", 0)));
assertAcked(prepareCreate(indexName, 0, indexSettingsNoReplicas(6)));
ensureGreen();
final int numdocs = randomIntBetween(50, 100);
IndexRequestBuilder[] builders = new IndexRequestBuilder[numdocs];
for (int i = 0; i < builders.length; i++) {
builders[i] = client().prepareIndex(indexName, SINGLE_MAPPING_NAME, Integer.toString(i)).setSource("field1", "bar " + i);
}
indexRandom(true, builders);
flushAndRefresh();
}
private void initializeRepo(String repoName) {
client().admin().cluster().preparePutRepository(repoName)
.setType("mock")
.setSettings(Settings.builder()
.put("compress", randomBoolean())
.put("location", randomAlphaOfLength(6))
.build())
.get();
indexRandomDocs(indexName, randomIntBetween(50, 100));
}
private void createSnapshotPolicy(String policyName, String snapshotNamePattern, String schedule, String REPO,
@ -546,49 +510,4 @@ public class SLMSnapshotBlockingIntegTests extends ESIntegTestCase {
return "bad";
}
}
public static String blockMasterFromFinalizingSnapshotOnIndexFile(final String repositoryName) {
final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
.repository(repositoryName)).setBlockOnWriteIndexFile(true);
return masterName;
}
public static String unblockRepo(final String repositoryName) {
final String masterName = internalCluster().getMasterName();
((MockRepository)internalCluster().getInstance(RepositoriesService.class, masterName)
.repository(repositoryName)).unblock();
return masterName;
}
public static void blockAllDataNodes(String repository) {
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository)repositoriesService.repository(repository)).blockOnDataFiles(true);
}
}
public static void unblockAllDataNodes(String repository) {
for(RepositoriesService repositoriesService : internalCluster().getDataNodeInstances(RepositoriesService.class)) {
((MockRepository)repositoriesService.repository(repository)).unblock();
}
}
public void waitForBlock(String node, String repository, TimeValue timeout) throws InterruptedException {
long start = System.currentTimeMillis();
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
while (System.currentTimeMillis() - start < timeout.millis()) {
if (mockRepository.blocked()) {
return;
}
Thread.sleep(100);
}
fail("Timeout waiting for node [" + node + "] to be blocked");
}
public boolean checkBlocked(String node, String repository) {
RepositoriesService repositoriesService = internalCluster().getInstance(RepositoriesService.class, node);
MockRepository mockRepository = (MockRepository) repositoriesService.repository(repository);
return mockRepository.blocked();
}
}