Enhance SnapshotResiliencyTests (#49514) (#49541)

A few enhancements to `SnapshotResiliencyTests`:
1. Test running requests from random nodes in more spots to enhance coverage (this is particularly motivated by #49060 where the additional number of cluster state updates makes it more interesting to fully cover all kinds of network failures)
2. Fix issue with restarting only master node in one test (doing so breaks the test at an incredibly low frequency, that becomes not so low in #49060 with the additional cluster state updates between request and response)
3. Improved cluster formation checks (now properly checks the term as well when forming cluster) + makes sure all nodes are connected to all other nodes (previously the data nodes would at times not be connected to other data nodes, which was shaken out now by adding the `client()` method
4. Make sure the cluster left behind by the test makes sense by running the repo cleanup action on it (this also increases coverage of the repository cleanup action obviously and adds the basis of making it part of more resiliency tests)
This commit is contained in:
Armin Braun 2019-11-25 13:31:28 +01:00 committed by GitHub
parent c149c64dc4
commit 2502ff39a0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 85 additions and 54 deletions

View File

@ -21,11 +21,16 @@ package org.elasticsearch.snapshots;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.RequestValidators;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.CleanupRepositoryResponse;
import org.elasticsearch.action.admin.cluster.repositories.cleanup.TransportCleanupRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
@ -236,6 +241,16 @@ public class SnapshotResiliencyTests extends ESTestCase {
@After
public void verifyReposThenStopServices() {
try {
clearDisruptionsAndAwaitSync();
final StepListener<CleanupRepositoryResponse> cleanupResponse = new StepListener<>();
client().admin().cluster().cleanupRepository(
new CleanupRepositoryRequest("repo"), cleanupResponse);
final AtomicBoolean cleanedUp = new AtomicBoolean(false);
continueOrDie(cleanupResponse, r -> cleanedUp.set(true));
runUntil(cleanedUp::get, TimeUnit.MINUTES.toMillis(1L));
if (blobStoreContext != null) {
blobStoreContext.forceConsistent();
}
@ -261,8 +276,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
final StepListener<CreateSnapshotResponse> createSnapshotResponseListener = new StepListener<>();
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> {
final Runnable afterIndexing = () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
final Runnable afterIndexing = () -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseListener);
if (documents == 0) {
afterIndexing.run();
@ -272,7 +287,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
}
final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
masterNode.client.bulk(bulkRequest, bulkResponseStepListener);
client().bulk(bulkRequest, bulkResponseStepListener);
continueOrDie(bulkResponseStepListener, bulkResponse -> {
assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
assertEquals(documents, bulkResponse.getItems().length);
@ -284,16 +299,16 @@ public class SnapshotResiliencyTests extends ESTestCase {
final StepListener<AcknowledgedResponse> deleteIndexListener = new StepListener<>();
continueOrDie(createSnapshotResponseListener,
createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener));
createSnapshotResponse -> client().admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener));
final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new StepListener<>();
continueOrDie(deleteIndexListener, ignored -> masterNode.client.admin().cluster().restoreSnapshot(
continueOrDie(deleteIndexListener, ignored -> client().admin().cluster().restoreSnapshot(
new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener));
final StepListener<SearchResponse> searchResponseListener = new StepListener<>();
continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
masterNode.client.search(
client().search(
new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener);
});
@ -322,33 +337,33 @@ public class SnapshotResiliencyTests extends ESTestCase {
public void testSnapshotWithNodeDisconnects() {
final int dataNodes = randomIntBetween(2, 10);
setupTestCluster(randomFrom(1, 3, 5), dataNodes);
final int masterNodes = randomFrom(1, 3, 5);
setupTestCluster(masterNodes, dataNodes);
String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);
TestClusterNodes.TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> {
continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
scheduleNow(this::disconnectRandomDataNode);
}
if (randomBoolean()) {
scheduleNow(() -> testClusterNodes.clearNetworkDisruptions());
}
masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener);
testClusterNodes.randomMasterNodeSafe().client.admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener);
});
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
scheduleNow(this::disconnectOrRestartDataNode);
}
final boolean disconnectedMaster = randomBoolean();
// Only disconnect master if we have more than a single master and can simulate a failover
final boolean disconnectedMaster = randomBoolean() && masterNodes > 1;
if (disconnectedMaster) {
scheduleNow(this::disconnectOrRestartMasterNode);
}
@ -388,18 +403,18 @@ public class SnapshotResiliencyTests extends ESTestCase {
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards),
createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
continueOrDie(createRepoAndIndex(repoName, index, shards),
createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.execute(createSnapshotResponseStepListener));
final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot(
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener));
final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();
continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster()
continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> client().admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true).execute(createAnotherSnapshotResponseStepListener));
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));
@ -433,24 +448,26 @@ public class SnapshotResiliencyTests extends ESTestCase {
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards),
createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
continueOrDie(createRepoAndIndex(repoName, index, shards),
createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener));
final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createSnapshotResponseStepListener,
createSnapshotResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2")
createSnapshotResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2")
.execute(createOtherSnapshotResponseStepListener));
final StepListener<Boolean> deleteSnapshotStepListener = new StepListener<>();
continueOrDie(createOtherSnapshotResponseStepListener,
createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot(
createSnapshotResponse -> client().admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName), ActionListener.wrap(
resp -> deleteSnapshotStepListener.onResponse(true),
e -> {
assertThat(e, instanceOf(ConcurrentSnapshotExecutionException.class));
final Throwable unwrapped =
ExceptionsHelper.unwrap(e, ConcurrentSnapshotExecutionException.class);
assertThat(unwrapped, instanceOf(ConcurrentSnapshotExecutionException.class));
deleteSnapshotStepListener.onResponse(false);
})));
@ -459,8 +476,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
continueOrDie(deleteSnapshotStepListener, deleted -> {
if (deleted) {
// The delete worked out, creating a third snapshot
masterNode.client.admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true)
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true)
.execute(createAnotherSnapshotResponseStepListener);
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));
@ -471,13 +487,12 @@ public class SnapshotResiliencyTests extends ESTestCase {
deterministicTaskQueue.runAllRunnableTasks();
final CreateSnapshotResponse thirdSnapshotResponse = createAnotherSnapshotResponseStepListener.result();
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
assertThat(snapshotIds, hasSize(thirdSnapshotResponse == null ? 2 : 3));
// We end up with two snapshots no matter if the delete worked out or not
assertThat(snapshotIds, hasSize(2));
for (SnapshotId snapshotId : snapshotIds) {
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
@ -509,8 +524,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
final StepListener<ClusterStateResponse> clusterStateResponseStepListener = new StepListener<>();
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards),
createIndexResponse -> masterAdminClient.cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener));
continueOrDie(createRepoAndIndex(repoName, index, shards),
createIndexResponse -> client().admin().cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener));
continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> {
final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0);
@ -580,19 +595,18 @@ public class SnapshotResiliencyTests extends ESTestCase {
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> {
continueOrDie(createRepoAndIndex(repoName, index, shards), createIndexResponse -> {
final AtomicBoolean initiatedSnapshot = new AtomicBoolean(false);
for (int i = 0; i < documents; ++i) {
// Index a few documents with different field names so we trigger a dynamic mapping update for each of them
masterNode.client.bulk(
new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo" + i, "bar")))
client().bulk(new BulkRequest().add(new IndexRequest(index).source(Collections.singletonMap("foo" + i, "bar")))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
assertNoFailureListener(
bulkResponse -> {
assertFalse("Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
if (initiatedSnapshot.compareAndSet(false, true)) {
masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener);
client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName).setWaitForCompletion(true)
.execute(createSnapshotResponseStepListener);
}
}));
}
@ -602,7 +616,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().restoreSnapshot(
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().restoreSnapshot(
new RestoreSnapshotRequest(repoName, snapshotName)
.renamePattern(index).renameReplacement(restoredIndex).waitForCompletion(true), restoreSnapshotResponseStepListener));
@ -610,8 +624,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
continueOrDie(restoreSnapshotResponseStepListener, restoreSnapshotResponse -> {
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
masterNode.client.search(
new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)),
client().search(new SearchRequest(restoredIndex).source(new SearchSourceBuilder().size(documents).trackTotalHits(true)),
searchResponseStepListener);
});
@ -653,18 +666,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
return res.actionGet();
}
private StepListener<CreateIndexResponse> createRepoAndIndex(TestClusterNodes.TestClusterNode masterNode, String repoName,
String index, int shards) {
final AdminClient adminClient = masterNode.client.admin();
private StepListener<CreateIndexResponse> createRepoAndIndex(String repoName, String index, int shards) {
final StepListener<AcknowledgedResponse> createRepositoryListener = new StepListener<>();
adminClient.cluster().preparePutRepository(repoName).setType(FsRepository.TYPE)
client().admin().cluster().preparePutRepository(repoName).setType(FsRepository.TYPE)
.setSettings(Settings.builder().put("location", randomAlphaOfLength(10))).execute(createRepositoryListener);
final StepListener<CreateIndexResponse> createIndexResponseStepListener = new StepListener<>();
continueOrDie(createRepositoryListener, acknowledgedResponse -> adminClient.indices().create(
continueOrDie(createRepositoryListener, acknowledgedResponse -> client().admin().indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)),
createIndexResponseStepListener));
@ -673,11 +683,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
private void clearDisruptionsAndAwaitSync() {
testClusterNodes.clearNetworkDisruptions();
runUntil(() -> {
final List<Long> versions = testClusterNodes.nodes.values().stream()
.map(n -> n.clusterService.state().version()).distinct().collect(Collectors.toList());
return versions.size() == 1L;
}, TimeUnit.MINUTES.toMillis(1L));
stabilize();
}
private void disconnectOrRestartDataNode() {
@ -714,15 +720,25 @@ public class SnapshotResiliencyTests extends ESTestCase {
.filter(DiscoveryNode::isMasterNode).map(DiscoveryNode::getId).collect(Collectors.toSet()));
testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode()).forEach(
testClusterNode -> testClusterNode.coordinator.setInitialConfiguration(votingConfiguration));
// Connect all nodes to each other
testClusterNodes.nodes.values().forEach(node -> testClusterNodes.nodes.values().forEach(
n -> n.transportService.connectToNode(node.node, null,
ActionTestUtils.assertNoFailureListener(c -> logger.info("--> Connected [{}] to [{}]", n.node, node.node)))));
stabilize();
}
private void stabilize() {
runUntil(
() -> {
List<String> masterNodeIds = testClusterNodes.nodes.values().stream()
.map(node -> node.clusterService.state().nodes().getMasterNodeId())
.distinct().collect(Collectors.toList());
return masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false;
final Collection<ClusterState> clusterStates =
testClusterNodes.nodes.values().stream().map(node -> node.clusterService.state()).collect(Collectors.toList());
final Set<String> masterNodeIds = clusterStates.stream()
.map(clusterState -> clusterState.nodes().getMasterNodeId()).collect(Collectors.toSet());
final Set<Long> terms = clusterStates.stream().map(ClusterState::term).collect(Collectors.toSet());
final List<Long> versions = clusterStates.stream().map(ClusterState::version).distinct().collect(Collectors.toList());
return versions.size() == 1 && masterNodeIds.size() == 1 && masterNodeIds.contains(null) == false && terms.size() == 1;
},
TimeUnit.SECONDS.toMillis(30L)
TimeUnit.MINUTES.toMillis(1L)
);
}
@ -768,6 +784,17 @@ public class SnapshotResiliencyTests extends ESTestCase {
return ActionListener.wrap(() -> {});
}
public NodeClient client() {
// Select from sorted list of nodes
final List<TestClusterNodes.TestClusterNode> nodes = testClusterNodes.nodes.values().stream()
.filter(n -> testClusterNodes.disconnectedNodes.contains(n.node.getName()) == false)
.sorted(Comparator.comparing(n -> n.node.getName())).collect(Collectors.toList());
if (nodes.isEmpty()) {
throw new AssertionError("No nodes available");
}
return randomFrom(nodes).client;
}
/**
* Create a {@link Environment} with random path.home and path.repo
**/
@ -843,7 +870,9 @@ public class SnapshotResiliencyTests extends ESTestCase {
public Optional<TestClusterNode> randomMasterNode() {
// Select from sorted list of data-nodes here to not have deterministic behaviour
final List<TestClusterNode> masterNodes = testClusterNodes.nodes.values().stream().filter(n -> n.node.isMasterNode())
final List<TestClusterNode> masterNodes = testClusterNodes.nodes.values().stream()
.filter(n -> n.node.isMasterNode())
.filter(n -> disconnectedNodes.contains(n.node.getName()) == false)
.sorted(Comparator.comparing(n -> n.node.getName())).collect(Collectors.toList());
return masterNodes.isEmpty() ? Optional.empty() : Optional.of(randomFrom(masterNodes));
}
@ -1187,6 +1216,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
transportService, clusterService, repositoriesService, threadPool,
actionFilters, indexNameExpressionResolver
));
actions.put(CleanupRepositoryAction.INSTANCE, new TransportCleanupRepositoryAction(transportService, clusterService,
repositoriesService, threadPool, actionFilters, indexNameExpressionResolver));
actions.put(CreateSnapshotAction.INSTANCE,
new TransportCreateSnapshotAction(
transportService, clusterService, threadPool,