Add Restore Operation to SnapshotResiliencyTests (#40634) (#41546)

* Add Restore Operation to SnapshotResiliencyTests

* Expand the successful snapshot test case to also include restoring the snapshop
  * Add indexing of documents as well to be able to meaningfully verify the restore
* This is part of the larger effort to test eventually consistent blob stores in #39504
This commit is contained in:
Armin Braun 2019-04-26 09:04:34 +02:00 committed by GitHub
parent 6996739a09
commit 6a24fd3f26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 149 additions and 15 deletions

View File

@ -34,18 +34,42 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSn
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateAction;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction;
import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction;
import org.elasticsearch.action.bulk.BulkAction;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.TransportBulkAction;
import org.elasticsearch.action.bulk.TransportShardBulkAction;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.resync.TransportResyncReplicationAction;
import org.elasticsearch.action.search.SearchAction;
import org.elasticsearch.action.search.SearchExecutionStatsCollector;
import org.elasticsearch.action.search.SearchPhaseController;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchTransportService;
import org.elasticsearch.action.search.TransportSearchAction;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.DestructiveOperations;
import org.elasticsearch.action.support.TransportAction;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterModule;
@ -54,6 +78,7 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.coordination.ClusterBootstrapService;
@ -68,6 +93,8 @@ import org.elasticsearch.cluster.metadata.AliasValidator;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService;
import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService;
import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService;
import org.elasticsearch.cluster.metadata.MetaDataMappingService;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
@ -100,7 +127,9 @@ import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction;
import org.elasticsearch.index.seqno.RetentionLeaseSyncAction;
import org.elasticsearch.index.shard.PrimaryReplicaSyncer;
import org.elasticsearch.indices.IndicesModule;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
@ -109,13 +138,16 @@ import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.ingest.IngestService;
import org.elasticsearch.node.ResponseCollectorService;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.fs.FsRepository;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.FetchPhase;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.disruption.DisruptableMockTransport;
import org.elasticsearch.test.disruption.NetworkDisruption;
@ -138,10 +170,12 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@ -177,7 +211,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
testClusterNodes.nodes.values().forEach(TestClusterNode::stop);
}
public void testSuccessfulSnapshot() {
public void testSuccessfulSnapshotAndRestore() {
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
String repoName = "repo";
@ -185,10 +219,12 @@ public class SnapshotResiliencyTests extends ESTestCase {
final String index = "test";
final int shards = randomIntBetween(1, 10);
final int documents = randomIntBetween(0, 100);
TestClusterNode masterNode =
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
final AtomicBoolean createdSnapshot = new AtomicBoolean();
final AtomicBoolean snapshotRestored = new AtomicBoolean();
final AtomicBoolean documentCountVerified = new AtomicBoolean();
masterNode.client.admin().cluster().preparePutRepository(repoName)
.setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10)))
.execute(
@ -197,12 +233,61 @@ public class SnapshotResiliencyTests extends ESTestCase {
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
.settings(defaultIndexSettings(shards)),
assertNoFailureListener(
() -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.execute(assertNoFailureListener(() -> createdSnapshot.set(true)))))));
deterministicTaskQueue.runAllRunnableTasks();
() -> {
final Runnable afterIndexing = () ->
masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
.setWaitForCompletion(true).execute(assertNoFailureListener(() -> {
createdSnapshot.set(true);
masterNode.client.admin().indices().delete(
new DeleteIndexRequest(index),
assertNoFailureListener(() -> masterNode.client.admin().cluster().restoreSnapshot(
new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true),
assertNoFailureListener(restoreSnapshotResponse -> {
snapshotRestored.set(true);
assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards());
masterNode.client.search(
new SearchRequest(index).source(
new SearchSourceBuilder().size(0).trackTotalHits(true)
),
assertNoFailureListener(r -> {
assertEquals(
(long) documents,
Objects.requireNonNull(r.getHits().getTotalHits()).value
);
documentCountVerified.set(true);
}));
})
)));
}));
final AtomicInteger countdown = new AtomicInteger(documents);
masterNode.client.admin().indices().putMapping(
new PutMappingRequest(index).type("_doc").source("foo", "type=text"),
assertNoFailureListener(r -> {
for (int i = 0; i < documents; ++i) {
masterNode.client.bulk(
new BulkRequest().add(new IndexRequest(index).source(
Collections.singletonMap("foo", "bar" + i)))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE),
assertNoFailureListener(
bulkResponse -> {
assertFalse(
"Failures in bulkresponse: " + bulkResponse.buildFailureMessage(),
bulkResponse.hasFailures());
if (countdown.decrementAndGet() == 0) {
afterIndexing.run();
}
}));
}
if (documents == 0) {
afterIndexing.run();
}
}
));
}))));
runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L));
assertTrue(createdSnapshot.get());
assertTrue(snapshotRestored.get());
assertTrue(documentCountVerified.get());
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
final Repository repository = masterNode.repositoriesService.repository(repoName);
@ -236,7 +321,6 @@ public class SnapshotResiliencyTests extends ESTestCase {
.execute(
assertNoFailureListener(
() -> masterNode.client.admin().indices().create(
new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL)
.settings(defaultIndexSettings(shards)),
assertNoFailureListener(
@ -833,6 +917,8 @@ public class SnapshotResiliencyTests extends ESTestCase {
allocationService = ESAllocationTestCase.createAllocationService(settings);
final IndexScopedSettings indexScopedSettings =
new IndexScopedSettings(settings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS);
final BigArrays bigArrays = new BigArrays(new PageCacheRecycler(settings), null, "test");
final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry();
indicesService = new IndicesService(
settings,
mock(PluginsService.class),
@ -841,12 +927,12 @@ public class SnapshotResiliencyTests extends ESTestCase {
new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(),
emptyMap(), emptyMap(), emptyMap(), emptyMap()),
indexNameExpressionResolver,
new MapperRegistry(emptyMap(), emptyMap(), MapperPlugin.NOOP_FIELD_FILTER),
mapperRegistry,
namedWriteableRegistry,
threadPool,
indexScopedSettings,
new NoneCircuitBreakerService(),
new BigArrays(new PageCacheRecycler(settings), null, "test"),
bigArrays,
scriptService,
client,
new MetaStateService(nodeEnv, namedXContentRegistry),
@ -863,6 +949,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
new RoutingService(clusterService, allocationService),
threadPool
);
final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService);
indicesClusterStateService = new IndicesClusterStateService(
settings,
indicesService,
@ -870,7 +957,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
threadPool,
new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService),
shardStateAction,
new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)),
new NodeMappingRefreshAction(transportService, metaDataMappingService),
repositoriesService,
mock(SearchService.class),
new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver),
@ -915,14 +1002,61 @@ public class SnapshotResiliencyTests extends ESTestCase {
actionFilters,
indexNameExpressionResolver));
Map<Action, TransportAction> actions = new HashMap<>();
final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService,
indicesService,
allocationService, new AliasValidator(), environment, indexScopedSettings,
threadPool, namedXContentRegistry, false);
actions.put(CreateIndexAction.INSTANCE,
new TransportCreateIndexAction(
transportService, clusterService, threadPool,
new MetaDataCreateIndexService(settings, clusterService, indicesService,
allocationService, new AliasValidator(), environment, indexScopedSettings,
threadPool, namedXContentRegistry, false),
metaDataCreateIndexService,
actionFilters, indexNameExpressionResolver
));
final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings);
mappingUpdatedAction.setClient(client);
final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService,
clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService),
actionFilters, indexNameExpressionResolver);
actions.put(BulkAction.INSTANCE,
new TransportBulkAction(threadPool, transportService, clusterService,
new IngestService(
clusterService, threadPool, environment, scriptService,
new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(),
Collections.emptyList()),
transportShardBulkAction, client, actionFilters, indexNameExpressionResolver,
new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver)
));
final RestoreService restoreService = new RestoreService(
clusterService, repositoriesService, allocationService,
metaDataCreateIndexService,
new MetaDataIndexUpgradeService(
settings, namedXContentRegistry,
mapperRegistry,
indexScopedSettings,
Collections.emptyList()
),
clusterSettings
);
actions.put(PutMappingAction.INSTANCE,
new TransportPutMappingAction(transportService, clusterService, threadPool, metaDataMappingService,
actionFilters, indexNameExpressionResolver, new TransportPutMappingAction.RequestValidators(Collections.emptyList())));
final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService);
final SearchTransportService searchTransportService = new SearchTransportService(transportService,
SearchExecutionStatsCollector.makeWrapper(responseCollectorService));
final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService,
bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService);
actions.put(SearchAction.INSTANCE,
new TransportSearchAction(threadPool, transportService, searchService,
searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService,
actionFilters, indexNameExpressionResolver));
actions.put(RestoreSnapshotAction.INSTANCE,
new TransportRestoreSnapshotAction(transportService, clusterService, threadPool, restoreService, actionFilters,
indexNameExpressionResolver));
actions.put(DeleteIndexAction.INSTANCE,
new TransportDeleteIndexAction(
transportService, clusterService, threadPool,
new MetaDataDeleteIndexService(settings, clusterService, allocationService), actionFilters,
indexNameExpressionResolver, new DestructiveOperations(settings, clusterSettings)));
actions.put(PutRepositoryAction.INSTANCE,
new TransportPutRepositoryAction(
transportService, clusterService, repositoriesService, threadPool,