From e0d84e7178e6a09a7a4ab15bcbd8feb9cc393acd Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 14 Aug 2019 21:53:07 +0200 Subject: [PATCH] Clean up Callback Chains and Duplicate in SnapshotResiliencyTests (#45398) (#45563) * It's in the title, follow up to #45233 * Flatten more listeners into `StepListener` * Remove duplication from repo and index bootstrap and asserting that the steps execute successfully --- .../snapshots/SnapshotResiliencyTests.java | 327 ++++++++---------- 1 file changed, 135 insertions(+), 192 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 47426396948..cadc93b9aaa 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -43,6 +43,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotR import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; +import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; @@ -116,6 +117,7 @@ import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.FakeThreadPoolMasterService; import org.elasticsearch.cluster.service.MasterService; +import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; @@ -188,7 +190,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -247,25 +248,15 @@ public class SnapshotResiliencyTests extends ESTestCase { String repoName = "repo"; String snapshotName = "snapshot"; final String index = "test"; - final int shards = randomIntBetween(1, 10); final int documents = randomIntBetween(0, 100); - final StepListener createRepositoryListener = new StepListener<>(); - final TestClusterNode masterNode = testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); - masterNode.client.admin().cluster().preparePutRepository(repoName) - .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10))) - .execute(createRepositoryListener); - - final StepListener createIndexResponseStepListener = new StepListener<>(); - createRepositoryListener.whenComplete(acknowledgedResponse -> masterNode.client.admin().indices().create( - new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)), - createIndexResponseStepListener), SnapshotResiliencyTests::rethrowAssertion); final StepListener createSnapshotResponseListener = new StepListener<>(); - createIndexResponseStepListener.whenComplete(createIndexResponse -> { + + continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { final Runnable afterIndexing = () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) .setWaitForCompletion(true).execute(createSnapshotResponseListener); if (documents == 0) { @@ -277,37 +268,35 @@ public class SnapshotResiliencyTests extends ESTestCase { } final StepListener bulkResponseStepListener = new StepListener<>(); masterNode.client.bulk(bulkRequest, bulkResponseStepListener); - bulkResponseStepListener.whenComplete(bulkResponse -> { + continueOrDie(bulkResponseStepListener, bulkResponse -> { assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures()); assertEquals(documents, bulkResponse.getItems().length); afterIndexing.run(); - }, SnapshotResiliencyTests::rethrowAssertion); + }); } - }, SnapshotResiliencyTests::rethrowAssertion); + }); final StepListener deleteIndexListener = new StepListener<>(); - createSnapshotResponseListener.whenComplete( - createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener), - SnapshotResiliencyTests::rethrowAssertion); + continueOrDie(createSnapshotResponseListener, + createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener)); final StepListener restoreSnapshotResponseListener = new StepListener<>(); - deleteIndexListener.whenComplete(ignored -> masterNode.client.admin().cluster().restoreSnapshot( - new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener), - SnapshotResiliencyTests::rethrowAssertion); + continueOrDie(deleteIndexListener, ignored -> masterNode.client.admin().cluster().restoreSnapshot( + new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener)); final StepListener searchResponseListener = new StepListener<>(); - restoreSnapshotResponseListener.whenComplete(restoreSnapshotResponse -> { - assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); - masterNode.client.search( - new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener); - }, SnapshotResiliencyTests::rethrowAssertion); + continueOrDie(restoreSnapshotResponseListener, restoreSnapshotResponse -> { + assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); + masterNode.client.search( + new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener); + }); final AtomicBoolean documentCountVerified = new AtomicBoolean(); - searchResponseListener.whenComplete(r -> { + continueOrDie(searchResponseListener, r -> { assertEquals(documents, Objects.requireNonNull(r.getHits().getTotalHits()).value); documentCountVerified.set(true); - }, SnapshotResiliencyTests::rethrowAssertion); + }); runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L)); assertNotNull(createSnapshotResponseListener.result()); @@ -333,59 +322,45 @@ public class SnapshotResiliencyTests extends ESTestCase { String repoName = "repo"; String snapshotName = "snapshot"; final String index = "test"; - final int shards = randomIntBetween(1, 10); TestClusterNode masterNode = testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); - final AtomicBoolean createdSnapshot = new AtomicBoolean(); - final AdminClient masterAdminClient = masterNode.client.admin(); - masterNode.client.admin().cluster().preparePutRepository(repoName) - .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10))) - .execute( - assertNoFailureListener( - () -> masterNode.client.admin().indices().create( - new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL) - .settings(defaultIndexSettings(shards)), - assertNoFailureListener( - () -> { - for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { - scheduleNow(this::disconnectRandomDataNode); - } - if (randomBoolean()) { - scheduleNow(() -> testClusterNodes.clearNetworkDisruptions()); - } - masterAdminClient.cluster().prepareCreateSnapshot(repoName, snapshotName) - .execute(assertNoFailureListener(() -> { - for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { - scheduleNow(this::disconnectOrRestartDataNode); - } - final boolean disconnectedMaster = randomBoolean(); - if (disconnectedMaster) { - scheduleNow(this::disconnectOrRestartMasterNode); - } - if (disconnectedMaster || randomBoolean()) { - scheduleSoon(() -> testClusterNodes.clearNetworkDisruptions()); - } else if (randomBoolean()) { - scheduleNow(() -> testClusterNodes.clearNetworkDisruptions()); - } - createdSnapshot.set(true); - })); - })))); + final StepListener createSnapshotResponseStepListener = new StepListener<>(); - runUntil(() -> { - final Optional randomMaster = testClusterNodes.randomMasterNode(); - if (randomMaster.isPresent()) { - final SnapshotsInProgress snapshotsInProgress = randomMaster.get().clusterService.state().custom(SnapshotsInProgress.TYPE); - return snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty(); + continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), createIndexResponse -> { + for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { + scheduleNow(this::disconnectRandomDataNode); } - return false; - }, TimeUnit.MINUTES.toMillis(1L)); + if (randomBoolean()) { + scheduleNow(() -> testClusterNodes.clearNetworkDisruptions()); + } + masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName).execute(createSnapshotResponseStepListener); + }); + + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> { + for (int i = 0; i < randomIntBetween(0, dataNodes); ++i) { + scheduleNow(this::disconnectOrRestartDataNode); + } + final boolean disconnectedMaster = randomBoolean(); + if (disconnectedMaster) { + scheduleNow(this::disconnectOrRestartMasterNode); + } + if (disconnectedMaster || randomBoolean()) { + scheduleSoon(() -> testClusterNodes.clearNetworkDisruptions()); + } else if (randomBoolean()) { + scheduleNow(() -> testClusterNodes.clearNetworkDisruptions()); + } + }); + + runUntil(() -> testClusterNodes.randomMasterNode().map(master -> { + final SnapshotsInProgress snapshotsInProgress = master.clusterService.state().custom(SnapshotsInProgress.TYPE); + return snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty(); + }).orElse(false), TimeUnit.MINUTES.toMillis(1L)); clearDisruptionsAndAwaitSync(); - assertTrue(createdSnapshot.get()); final TestClusterNode randomMaster = testClusterNodes.randomMasterNode() .orElseThrow(() -> new AssertionError("expected to find at least one active master node")); SnapshotsInProgress finalSnapshotsInProgress = randomMaster.clusterService.state().custom(SnapshotsInProgress.TYPE); @@ -401,32 +376,32 @@ public class SnapshotResiliencyTests extends ESTestCase { String repoName = "repo"; String snapshotName = "snapshot"; final String index = "test"; - final int shards = randomIntBetween(1, 10); TestClusterNode masterNode = testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); - final AtomicBoolean createdSnapshot = new AtomicBoolean(); - masterNode.client.admin().cluster().preparePutRepository(repoName) - .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10))) - .execute( - assertNoFailureListener( - () -> masterNode.client.admin().indices().create( - new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL) - .settings(defaultIndexSettings(shards)), - assertNoFailureListener( - () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .execute(assertNoFailureListener( - () -> masterNode.client.admin().cluster().deleteSnapshot( - new DeleteSnapshotRequest(repoName, snapshotName), - assertNoFailureListener(() -> masterNode.client.admin().cluster() - .prepareCreateSnapshot(repoName, snapshotName).execute( - assertNoFailureListener(() -> createdSnapshot.set(true)) - ))))))))); + + final StepListener createSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), + createIndexResponse -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .execute(createSnapshotResponseStepListener)); + + final StepListener deleteSnapshotStepListener = new StepListener<>(); + + continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> masterNode.client.admin().cluster().deleteSnapshot( + new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener)); + + final StepListener createAnotherSnapshotResponseStepListener = new StepListener<>(); + + continueOrDie(deleteSnapshotStepListener, acknowledgedResponse -> masterNode.client.admin().cluster() + .prepareCreateSnapshot(repoName, snapshotName).execute(createAnotherSnapshotResponseStepListener)); + continueOrDie(createAnotherSnapshotResponseStepListener, createSnapshotResponse -> + assertEquals(createSnapshotResponse.getSnapshotInfo().state(), SnapshotState.SUCCESS)); deterministicTaskQueue.runAllRunnableTasks(); - assertTrue(createdSnapshot.get()); + assertNotNull(createAnotherSnapshotResponseStepListener.result()); SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); @@ -458,69 +433,54 @@ public class SnapshotResiliencyTests extends ESTestCase { testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); final AtomicBoolean createdSnapshot = new AtomicBoolean(); final AdminClient masterAdminClient = masterNode.client.admin(); - masterAdminClient.cluster().preparePutRepository(repoName) - .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10))) - .execute( - assertNoFailureListener( - () -> masterAdminClient.indices().create( - new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL) - .settings(defaultIndexSettings(shards)), - assertNoFailureListener( - () -> masterAdminClient.cluster().state(new ClusterStateRequest(), assertNoFailureListener( - clusterStateResponse -> { - final ShardRouting shardToRelocate = - clusterStateResponse.getState().routingTable().allShards(index).get(0); - final TestClusterNode currentPrimaryNode = - testClusterNodes.nodeById(shardToRelocate.currentNodeId()); - final TestClusterNode otherNode = - testClusterNodes.randomDataNodeSafe(currentPrimaryNode.node.getName()); - final Runnable maybeForceAllocate = new Runnable() { - @Override - public void run() { - masterAdminClient.cluster().state(new ClusterStateRequest(), assertNoFailureListener( - resp -> { - final ShardRouting shardRouting = resp.getState().routingTable() - .shardRoutingTable(shardToRelocate.shardId()).primaryShard(); - if (shardRouting.unassigned() - && shardRouting.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT) { - if (masterNodeCount > 1) { - scheduleNow(() -> testClusterNodes.stopNode(masterNode)); - } - testClusterNodes.randomDataNodeSafe().client.admin().cluster() - .prepareCreateSnapshot(repoName, snapshotName) - .execute(ActionListener.wrap(() -> { - testClusterNodes.randomDataNodeSafe().client.admin().cluster() - .deleteSnapshot( - new DeleteSnapshotRequest(repoName, snapshotName), noopListener()); - createdSnapshot.set(true); - })); - scheduleNow( - () -> testClusterNodes.randomMasterNodeSafe().client.admin().cluster().reroute( - new ClusterRerouteRequest().add( - new AllocateEmptyPrimaryAllocationCommand( - index, shardRouting.shardId().id(), otherNode.node.getName(), true) - ), noopListener())); - } else { - scheduleSoon(this); - } - } - )); - } - }; - scheduleNow(() -> testClusterNodes.stopNode(currentPrimaryNode)); - scheduleNow(maybeForceAllocate); - } - )))))); - runUntil(() -> { - final Optional randomMaster = testClusterNodes.randomMasterNode(); - if (randomMaster.isPresent()) { - final SnapshotsInProgress snapshotsInProgress = - randomMaster.get().clusterService.state().custom(SnapshotsInProgress.TYPE); - return (snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty()) && createdSnapshot.get(); + final StepListener clusterStateResponseStepListener = new StepListener<>(); + + continueOrDie(createRepoAndIndex(masterNode, repoName, index, shards), + createIndexResponse -> masterAdminClient.cluster().state(new ClusterStateRequest(), clusterStateResponseStepListener)); + + continueOrDie(clusterStateResponseStepListener, clusterStateResponse -> { + final ShardRouting shardToRelocate = clusterStateResponse.getState().routingTable().allShards(index).get(0); + final TestClusterNode currentPrimaryNode = testClusterNodes.nodeById(shardToRelocate.currentNodeId()); + final TestClusterNode otherNode = testClusterNodes.randomDataNodeSafe(currentPrimaryNode.node.getName()); + scheduleNow(() -> testClusterNodes.stopNode(currentPrimaryNode)); + scheduleNow(new Runnable() { + @Override + public void run() { + final StepListener updatedClusterStateResponseStepListener = new StepListener<>(); + masterAdminClient.cluster().state(new ClusterStateRequest(), updatedClusterStateResponseStepListener); + continueOrDie(updatedClusterStateResponseStepListener, updatedClusterState -> { + final ShardRouting shardRouting = + updatedClusterState.getState().routingTable().shardRoutingTable(shardToRelocate.shardId()).primaryShard(); + if (shardRouting.unassigned() && shardRouting.unassignedInfo().getReason() == UnassignedInfo.Reason.NODE_LEFT) { + if (masterNodeCount > 1) { + scheduleNow(() -> testClusterNodes.stopNode(masterNode)); + } + testClusterNodes.randomDataNodeSafe().client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .execute(ActionListener.wrap(() -> { + createdSnapshot.set(true); + testClusterNodes.randomDataNodeSafe().client.admin().cluster().deleteSnapshot( + new DeleteSnapshotRequest(repoName, snapshotName), noopListener()); + })); + scheduleNow( + () -> testClusterNodes.randomMasterNodeSafe().client.admin().cluster().reroute( + new ClusterRerouteRequest().add(new AllocateEmptyPrimaryAllocationCommand( + index, shardRouting.shardId().id(), otherNode.node.getName(), true)), noopListener())); + } else { + scheduleSoon(this); + } + }); + } + }); + }); + + runUntil(() -> testClusterNodes.randomMasterNode().map(master -> { + if (createdSnapshot.get() == false) { + return false; } - return false; - }, TimeUnit.MINUTES.toMillis(1L)); + final SnapshotsInProgress snapshotsInProgress = master.clusterService.state().custom(SnapshotsInProgress.TYPE); + return snapshotsInProgress == null || snapshotsInProgress.entries().isEmpty(); + }).orElse(false), TimeUnit.MINUTES.toMillis(1L)); clearDisruptionsAndAwaitSync(); @@ -533,6 +493,23 @@ public class SnapshotResiliencyTests extends ESTestCase { assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0))); } + private StepListener createRepoAndIndex(TestClusterNode masterNode, String repoName, String index, int shards) { + final AdminClient adminClient = masterNode.client.admin(); + + final StepListener createRepositoryListener = new StepListener<>(); + + adminClient.cluster().preparePutRepository(repoName).setType(FsRepository.TYPE) + .setSettings(Settings.builder().put("location", randomAlphaOfLength(10))).execute(createRepositoryListener); + + final StepListener createIndexResponseStepListener = new StepListener<>(); + + continueOrDie(createRepositoryListener, acknowledgedResponse -> adminClient.indices().create( + new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)), + createIndexResponseStepListener)); + + return createIndexResponseStepListener; + } + private void clearDisruptionsAndAwaitSync() { testClusterNodes.clearNetworkDisruptions(); runUntil(() -> { @@ -620,48 +597,14 @@ public class SnapshotResiliencyTests extends ESTestCase { .put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0).build(); } - private static void rethrowAssertion(Exception e) { - throw new AssertionError(e); - } - - private static ActionListener assertNoFailureListener(Consumer consumer) { - return new ActionListener() { - @Override - public void onResponse(final T t) { - consumer.accept(t); - } - - @Override - public void onFailure(final Exception e) { - throw new AssertionError(e); - } - }; - } - - private static ActionListener assertNoFailureListener(Runnable r) { - return new ActionListener() { - @Override - public void onResponse(final T t) { - r.run(); - } - - @Override - public void onFailure(final Exception e) { - throw new AssertionError(e); - } - }; + private static void continueOrDie(StepListener listener, CheckedConsumer onResponse) { + listener.whenComplete(onResponse, e -> { + throw new AssertionError(e); + }); } private static ActionListener noopListener() { - return new ActionListener() { - @Override - public void onResponse(final T t) { - } - - @Override - public void onFailure(final Exception e) { - } - }; + return ActionListener.wrap(() -> {}); } /** @@ -688,7 +631,7 @@ public class SnapshotResiliencyTests extends ESTestCase { // LinkedHashMap so we have deterministic ordering when iterating over the map in tests private final Map nodes = new LinkedHashMap<>(); - private DisconnectedNodes disruptedLinks = new DisconnectedNodes(); + private final DisconnectedNodes disruptedLinks = new DisconnectedNodes(); TestClusterNodes(int masterNodes, int dataNodes) { for (int i = 0; i < masterNodes; ++i) {