Clean up Callback Chains and Duplicate in SnapshotResiliencyTests (#45398) (#45563)

* It's in the title, follow up to #45233
* Flatten more listeners into `StepListener`
* Remove duplication from repo and index bootstrap and asserting that the steps execute successfully
This commit is contained in:
Armin Braun 2019-08-14 21:53:07 +02:00 committed by GitHub
parent 8fdbcd7395
commit e0d84e7178
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 135 additions and 192 deletions

View File

@ -43,6 +43,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR
import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@ -116,6 +117,7 @@ import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.FakeThreadPoolMasterService;
import org.elasticsearch.cluster.service.MasterService;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkModule;
@ -188,7 +190,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@ -247,25 +248,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);
final int documents = randomIntBetween(0, 100);
final StepListener<AcknowledgedResponse> createRepositoryListener = new StepListener<>();
final TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
masterNode.client.admin().cluster().preparePutRepository(repoName)
.setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
.execute(createRepositoryListener);
final StepListener<CreateIndexResponse> createIndexResponseStepListener = new StepListener<>();
createRepositoryListener.whenComplete(acknowledgedResponse -> masterNode.client.admin().indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)),
createIndexResponseStepListener), SnapshotResiliencyTests::rethrowAssertion);
final StepListener<CreateSnapshotResponse> createSnapshotResponseListener = new StepListener<>();
createIndexResponseStepListener.whenComplete(createIndexResponse -> {
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> {
final Runnable afterIndexing = () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseListener);
if (documents == 0) {
@ -277,37 +268,35 @@ public class SnapshotResiliencyTests extends ESTestCase {
}
final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
masterNode.client.bulk(bulkRequest, bulkResponseStepListener);
bulkResponseStepListener.whenComplete(bulkResponse -> {
continueOrDie(bulkResponseStepListener, bulkResponse -> {
assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
assertEquals(documents, bulkResponse.getItems().length);
afterIndexing.run();
}, SnapshotResiliencyTests::rethrowAssertion);
});
}
}, SnapshotResiliencyTests::rethrowAssertion);
});
final StepListener<AcknowledgedResponse> deleteIndexListener = new StepListener<>();
createSnapshotResponseListener.whenComplete(
createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener),
SnapshotResiliencyTests::rethrowAssertion);
continueOrDie(createSnapshotResponseListener,
createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener));
final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new StepListener<>();
deleteIndexListener.whenComplete(ignored -> masterNode.client.admin().cluster().restoreSnapshot(
new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener),
SnapshotResiliencyTests::rethrowAssertion);
continueOrDie(deleteIndexListener, ignored -> masterNode.client.admin().cluster().restoreSnapshot(
new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener));
final StepListener<SearchResponse> searchResponseListener = new StepListener<>();
restoreSnapshotResponseListener.whenComplete(restoreSnapshotResponse -> {
continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> {
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
masterNode.client.search(
new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener);
}, SnapshotResiliencyTests::rethrowAssertion);
});
final AtomicBoolean documentCountVerified = new AtomicBoolean();
searchResponseListener.whenComplete(r -> {
continueOrDie(searchResponseListener, r -> {
assertEquals(documents, Objects.requireNonNull(r.getHits().getTotalHits()).value);
documentCountVerified.set(true);
}, SnapshotResiliencyTests::rethrowAssertion);
});
runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
assertNotNull(createSnapshotResponseListener.result());
@ -333,31 +322,24 @@ public class SnapshotResiliencyTests extends ESTestCase {
String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);
TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final AtomicBoolean createdSnapshot = new AtomicBoolean();
final AdminClient masterAdminClient = masterNode.client.admin();
masterNode.client.admin().cluster().preparePutRepository(repoName)
.setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
.execute(
assertNoFailureListener(
() -> masterNode.client.admin().indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
.settings(defaultIndexSettings(shards)),
assertNoFailureListener(
() -> {
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> {
for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
scheduleNow(this::disconnectRandomDataNode);
}
if (randomBoolean()) {
scheduleNow(() -> testClusterNodes.clearNetworkDisruptions());
}
masterAdminClient.cluster().prepareCreateSnapshot(repoName, snapshotName)
.execute(assertNoFailureListener(() -> {
masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener);
});
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> {
for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) {
scheduleNow(this::disconnectOrRestartDataNode);
}
@ -370,22 +352,15 @@ public class SnapshotResiliencyTests extends ESTestCase {
} else if (randomBoolean()) {
scheduleNow(() -> testClusterNodes.clearNetworkDisruptions());
}
createdSnapshot.set(true);
}));
}))));
});
runUntil(() -> {
final Optional<TestClusterNode> randomMaster = testClusterNodes.randomMasterNode();
if (randomMaster.isPresent()) {
final SnapshotsInProgress snapshotsInProgress = randomMaster.get().clusterService.state().custom(SnapshotsInProgress.TYPE);
runUntil(() -> testClusterNodes.randomMasterNode().map(master -> {
final SnapshotsInProgress snapshotsInProgress = master.clusterService.state().custom(SnapshotsInProgress.TYPE);
return snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty();
}
return false;
}, TimeUnit.MINUTES.toMillis(1L));
}).orElse(false), TimeUnit.MINUTES.toMillis(1L));
clearDisruptionsAndAwaitSync();
assertTrue(createdSnapshot.get());
final TestClusterNode randomMaster = testClusterNodes.randomMasterNode()
.orElseThrow(() -> new AssertionError("expected to find at least one active master node"));
SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE);
@ -401,32 +376,32 @@ public class SnapshotResiliencyTests extends ESTestCase {
String repoName = "repo";
String snapshotName = "snapshot";
final String index = "test";
final int shards = randomIntBetween(1, 10);
TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final AtomicBoolean createdSnapshot = new AtomicBoolean();
masterNode.client.admin().cluster().preparePutRepository(repoName)
.setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
.execute(
assertNoFailureListener(
() -> masterNode.client.admin().indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
.settings(defaultIndexSettings(shards)),
assertNoFailureListener(
() -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.execute(assertNoFailureListener(
() -> masterNode.client.admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName),
assertNoFailureListener(() -> masterNode.client.admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).execute(
assertNoFailureListener(() -> createdSnapshot.set(true))
)))))))));
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards),
createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.execute(createSnapshotResponseStepListener));
final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener));
final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();
continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName).execute(createAnotherSnapshotResponseStepListener));
continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse ->
assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS));
deterministicTaskQueue.runAllRunnableTasks();
assertTrue(createdSnapshot.get());
assertNotNull(createAnotherSnapshotResponseStepListener.result());
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
@ -458,69 +433,54 @@ public class SnapshotResiliencyTests extends ESTestCase {
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final AtomicBoolean createdSnapshot = new AtomicBoolean();
final AdminClient masterAdminClient = masterNode.client.admin();
masterAdminClient.cluster().preparePutRepository(repoName)
.setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
.execute(
assertNoFailureListener(
() -> masterAdminClient.indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
.settings(defaultIndexSettings(shards)),
assertNoFailureListener(
() -> masterAdminClient.cluster().state(new ClusterStateRequest(), assertNoFailureListener(
clusterStateResponse -> {
final ShardRouting shardToRelocate =
clusterStateResponse.getState().routingTable().allShards(index).get(0);
final TestClusterNode currentPrimaryNode =
testClusterNodes.nodeById(shardToRelocate.currentNodeId());
final TestClusterNode otherNode =
testClusterNodes.randomDataNodeSafe(currentPrimaryNode.node.getName());
final Runnable maybeForceAllocate = new Runnable() {
final StepListener<ClusterStateResponse> clusterStateResponseStepListener = new StepListener<>();
continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards),
createIndexResponse -> masterAdminClient.cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener));
continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> {
final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0);
final TestClusterNode currentPrimaryNode = testClusterNodes.nodeById(shardToRelocate.currentNodeId());
final TestClusterNode otherNode = testClusterNodes.randomDataNodeSafe(currentPrimaryNode.node.getName());
scheduleNow(() -> testClusterNodes.stopNode(currentPrimaryNode));
scheduleNow(new Runnable() {
@Override
public void run() {
masterAdminClient.cluster().state(new ClusterStateRequest(), assertNoFailureListener(
resp -> {
final ShardRouting shardRouting = resp.getState().routingTable()
.shardRoutingTable(shardToRelocate.shardId()).primaryShard();
if (shardRouting.unassigned()
&& shardRouting.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT) {
final StepListener<ClusterStateResponse> updatedClusterStateResponseStepListener = new StepListener<>();
masterAdminClient.cluster().state(new ClusterStateRequest(), updatedClusterStateResponseStepListener);
continueOrDie(updatedClusterStateResponseStepListener, updatedClusterState -> {
final ShardRouting shardRouting =
updatedClusterState.getState().routingTable().shardRoutingTable(shardToRelocate.shardId()).primaryShard();
if (shardRouting.unassigned() && shardRouting.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT) {
if (masterNodeCount > 1) {
scheduleNow(() -> testClusterNodes.stopNode(masterNode));
}
testClusterNodes.randomDataNodeSafe().client.admin().cluster()
.prepareCreateSnapshot(repoName, snapshotName)
testClusterNodes.randomDataNodeSafe().client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.execute(ActionListener.wrap(() -> {
testClusterNodes.randomDataNodeSafe().client.admin().cluster()
.deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName), noopListener());
createdSnapshot.set(true);
testClusterNodes.randomDataNodeSafe().client.admin().cluster().deleteSnapshot(
new DeleteSnapshotRequest(repoName, snapshotName), noopListener());
}));
scheduleNow(
() -> testClusterNodes.randomMasterNodeSafe().client.admin().cluster().reroute(
new ClusterRerouteRequest().add(
new AllocateEmptyPrimaryAllocationCommand(
index, shardRouting.shardId().id(), otherNode.node.getName(), true)
), noopListener()));
new ClusterRerouteRequest().add(new AllocateEmptyPrimaryAllocationCommand(
index, shardRouting.shardId().id(), otherNode.node.getName(), true)), noopListener()));
} else {
scheduleSoon(this);
}
});
}
));
}
};
scheduleNow(() -> testClusterNodes.stopNode(currentPrimaryNode));
scheduleNow(maybeForceAllocate);
}
))))));
});
});
runUntil(() -> {
final Optional<TestClusterNode> randomMaster = testClusterNodes.randomMasterNode();
if (randomMaster.isPresent()) {
final SnapshotsInProgress snapshotsInProgress =
randomMaster.get().clusterService.state().custom(SnapshotsInProgress.TYPE);
return (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) && createdSnapshot.get();
}
runUntil(() -> testClusterNodes.randomMasterNode().map(master -> {
if (createdSnapshot.get() == false) {
return false;
}, TimeUnit.MINUTES.toMillis(1L));
}
final SnapshotsInProgress snapshotsInProgress = master.clusterService.state().custom(SnapshotsInProgress.TYPE);
return snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty();
}).orElse(false), TimeUnit.MINUTES.toMillis(1L));
clearDisruptionsAndAwaitSync();
@ -533,6 +493,23 @@ public class SnapshotResiliencyTests extends ESTestCase {
assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0)));
}
private StepListener<CreateIndexResponse> createRepoAndIndex(TestClusterNode masterNode, String repoName, String index, int shards) {
final AdminClient adminClient = masterNode.client.admin();
final StepListener<AcknowledgedResponse> createRepositoryListener = new StepListener<>();
adminClient.cluster().preparePutRepository(repoName).setType(FsRepository.TYPE)
.setSettings(Settings.builder().put("location", randomAlphaOfLength(10))).execute(createRepositoryListener);
final StepListener<CreateIndexResponse> createIndexResponseStepListener = new StepListener<>();
continueOrDie(createRepositoryListener, acknowledgedResponse -> adminClient.indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)),
createIndexResponseStepListener));
return createIndexResponseStepListener;
}
private void clearDisruptionsAndAwaitSync() {
testClusterNodes.clearNetworkDisruptions();
runUntil(() -> {
@ -620,48 +597,14 @@ public class SnapshotResiliencyTests extends ESTestCase {
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0).build();
}
private static void rethrowAssertion(Exception e) {
private static <T> void continueOrDie(StepListener<T> listener, CheckedConsumer<T, Exception> onResponse) {
listener.whenComplete(onResponse, e -> {
throw new AssertionError(e);
}
private static <T> ActionListener<T> assertNoFailureListener(Consumer<T> consumer) {
return new ActionListener<T>() {
@Override
public void onResponse(final T t) {
consumer.accept(t);
}
@Override
public void onFailure(final Exception e) {
throw new AssertionError(e);
}
};
}
private static <T> ActionListener<T> assertNoFailureListener(Runnable r) {
return new ActionListener<T>() {
@Override
public void onResponse(final T t) {
r.run();
}
@Override
public void onFailure(final Exception e) {
throw new AssertionError(e);
}
};
});
}
private static <T> ActionListener<T> noopListener() {
return new ActionListener<T>() {
@Override
public void onResponse(final T t) {
}
@Override
public void onFailure(final Exception e) {
}
};
return ActionListener.wrap(() -> {});
}
/**
@ -688,7 +631,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
// LinkedHashMap so we have deterministic ordering when iterating over the map in tests
private final Map<String, TestClusterNode> nodes = new LinkedHashMap<>();
private DisconnectedNodes disruptedLinks = new DisconnectedNodes();
private final DisconnectedNodes disruptedLinks = new DisconnectedNodes();
TestClusterNodes(int masterNodes, int dataNodes) {
for (int i = 0; i < masterNodes; ++i) {