Use StepListener to Simplify SnapshotResiliencyTests (#45233) (#45386)

* Reduces complicated callback relations in `testSuccessfulSnapshotAndRestore` to flat steps of sequential actions
* Will refactor the other tests in this suit as a follow up
   * This format certainly makes it easier to create more complicated tests that involve multiple subsequent snapshots as it would allow adding loops
This commit is contained in:
Armin Braun 2019-08-09 18:19:48 +02:00 committed by GitHub
parent 9e6d874a41
commit d1ed9bdbfd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 69 additions and 59 deletions

View File

@ -25,24 +25,28 @@ import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.RequestValidators;
import org.elasticsearch.action.StepListener;
import org.elasticsearch.action.admin.cluster.repositories.put.PutRepositoryAction;
import org.elasticsearch.action.admin.cluster.repositories.put.TransportPutRepositoryAction;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.TransportClusterRerouteAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@ -53,6 +57,7 @@ import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.index.IndexRequest;
@ -61,6 +66,7 @@ import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
@ -69,6 +75,7 @@ import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.node.NodeClient;
@ -181,7 +188,6 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -244,68 +250,68 @@ public class SnapshotResiliencyTests extends ESTestCase {
final int shards = randomIntBetween(1, 10);
final int documents = randomIntBetween(0, 100);
TestClusterNode masterNode =
final StepListener<AcknowledgedResponse> createRepositoryListener = new StepListener<>();
final TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final AtomicBoolean createdSnapshot = new AtomicBoolean();
final AtomicBoolean snapshotRestored = new AtomicBoolean();
final AtomicBoolean documentCountVerified = new AtomicBoolean();
masterNode.client.admin().cluster().preparePutRepository(repoName)
.setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
.execute(
assertNoFailureListener(
() -> masterNode.client.admin().indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
.settings(defaultIndexSettings(shards)),
assertNoFailureListener(
() -> {
final Runnable afterIndexing = () ->
masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(assertNoFailureListener(() -> {
createdSnapshot.set(true);
masterNode.client.admin().indices().delete(
new DeleteIndexRequest(index),
assertNoFailureListener(() -> masterNode.client.admin().cluster().restoreSnapshot(
new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true),
assertNoFailureListener(restoreSnapshotResponse -> {
snapshotRestored.set(true);
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
masterNode.client.search(
new SearchRequest(index).source(
new SearchSourceBuilder().size(0).trackTotalHits(true)
),
assertNoFailureListener(r -> {
assertEquals(
(long) documents,
Objects.requireNonNull(r.getHits().getTotalHits()).value
);
documentCountVerified.set(true);
}));
})
)));
}));
final AtomicInteger countdown = new AtomicInteger(documents);
for (int i = 0; i < documents; ++i) {
masterNode.client.bulk(
new BulkRequest().add(new IndexRequest(index).source(
Collections.singletonMap("foo", "bar" + i)))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
assertNoFailureListener(
bulkResponse -> {
assertFalse(
"Failures in bulkresponse: " + bulkResponse.buildFailureMessage(),
bulkResponse.hasFailures());
if (countdown.decrementAndGet() == 0) {
afterIndexing.run();
}
}));
}
if (documents == 0) {
afterIndexing.run();
}
}))));
.execute(createRepositoryListener);
final StepListener<CreateIndexResponse> createIndexResponseStepListener = new StepListener<>();
createRepositoryListener.whenComplete(acknowledgedResponse -> masterNode.client.admin().indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL).settings(defaultIndexSettings(shards)),
createIndexResponseStepListener), SnapshotResiliencyTests::rethrowAssertion);
final StepListener<CreateSnapshotResponse> createSnapshotResponseListener = new StepListener<>();
createIndexResponseStepListener.whenComplete(createIndexResponse -> {
final Runnable afterIndexing = () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(createSnapshotResponseListener);
if (documents == 0) {
afterIndexing.run();
} else {
final BulkRequest bulkRequest = new BulkRequest().setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (int i = 0; i < documents; ++i) {
bulkRequest.add(new IndexRequest(index).source(Collections.singletonMap("foo", "bar" + i)));
}
final StepListener<BulkResponse> bulkResponseStepListener = new StepListener<>();
masterNode.client.bulk(bulkRequest, bulkResponseStepListener);
bulkResponseStepListener.whenComplete(bulkResponse -> {
assertFalse("Failures in bulk response: " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures());
assertEquals(documents, bulkResponse.getItems().length);
afterIndexing.run();
}, SnapshotResiliencyTests::rethrowAssertion);
}
}, SnapshotResiliencyTests::rethrowAssertion);
final StepListener<AcknowledgedResponse> deleteIndexListener = new StepListener<>();
createSnapshotResponseListener.whenComplete(
createSnapshotResponse -> masterNode.client.admin().indices().delete(new DeleteIndexRequest(index), deleteIndexListener),
SnapshotResiliencyTests::rethrowAssertion);
final StepListener<RestoreSnapshotResponse> restoreSnapshotResponseListener = new StepListener<>();
deleteIndexListener.whenComplete(ignored -> masterNode.client.admin().cluster().restoreSnapshot(
new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), restoreSnapshotResponseListener),
SnapshotResiliencyTests::rethrowAssertion);
final StepListener<SearchResponse> searchResponseListener = new StepListener<>();
restoreSnapshotResponseListener.whenComplete(restoreSnapshotResponse -> {
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
masterNode.client.search(
new SearchRequest(index).source(new SearchSourceBuilder().size(0).trackTotalHits(true)), searchResponseListener);
}, SnapshotResiliencyTests::rethrowAssertion);
final AtomicBoolean documentCountVerified = new AtomicBoolean();
searchResponseListener.whenComplete(r -> {
assertEquals(documents, Objects.requireNonNull(r.getHits().getTotalHits()).value);
documentCountVerified.set(true);
}, SnapshotResiliencyTests::rethrowAssertion);
runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
assertTrue(createdSnapshot.get());
assertTrue(snapshotRestored.get());
assertNotNull(createSnapshotResponseListener.result());
assertNotNull(restoreSnapshotResponseListener.result());
assertTrue(documentCountVerified.get());
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
@ -614,6 +620,10 @@ public class SnapshotResiliencyTests extends ESTestCase {
.put(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.getKey(), 0).build();
}
private static void rethrowAssertion(Exception e) {
throw new AssertionError(e);
}
private static <T> ActionListener<T> assertNoFailureListener(Consumer<T> consumer) {
return new ActionListener<T>() {
@Override