From 6a24fd3f260e03fd022e65bac6d6710850c68abd Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Fri, 26 Apr 2019 09:04:34 +0200 Subject: [PATCH] Add Restore Operation to SnapshotResiliencyTests (#40634) (#41546) * Add Restore Operation to SnapshotResiliencyTests * Expand the successful snapshot test case to also include restoring the snapshop * Add indexing of documents as well to be able to meaningfully verify the restore * This is part of the larger effort to test eventually consistent blob stores in #39504 --- .../snapshots/SnapshotResiliencyTests.java | 164 ++++++++++++++++-- 1 file changed, 149 insertions(+), 15 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 12be86a4115..29fdc91a367 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -34,18 +34,42 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.TransportCreateSn import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotAction; import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.admin.cluster.snapshots.delete.TransportDeleteSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotAction; +import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest; +import org.elasticsearch.action.admin.cluster.snapshots.restore.TransportRestoreSnapshotAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateAction; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction; import org.elasticsearch.action.admin.indices.create.CreateIndexAction; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.TransportCreateIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction; +import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; +import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingAction; +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; +import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction; import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresAction; import org.elasticsearch.action.admin.indices.shards.TransportIndicesShardStoresAction; +import org.elasticsearch.action.bulk.BulkAction; +import org.elasticsearch.action.bulk.BulkRequest; +import org.elasticsearch.action.bulk.TransportBulkAction; +import org.elasticsearch.action.bulk.TransportShardBulkAction; +import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.resync.TransportResyncReplicationAction; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchExecutionStatsCollector; +import org.elasticsearch.action.search.SearchPhaseController; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchTransportService; +import org.elasticsearch.action.search.TransportSearchAction; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.ActiveShardCount; +import org.elasticsearch.action.support.AutoCreateIndex; +import org.elasticsearch.action.support.DestructiveOperations; import org.elasticsearch.action.support.TransportAction; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.cluster.ClusterModule; @@ -54,6 +78,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.index.NodeMappingRefreshAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.coordination.ClusterBootstrapService; @@ -68,6 +93,8 @@ import org.elasticsearch.cluster.metadata.AliasValidator; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MetaDataCreateIndexService; +import org.elasticsearch.cluster.metadata.MetaDataDeleteIndexService; +import org.elasticsearch.cluster.metadata.MetaDataIndexUpgradeService; import org.elasticsearch.cluster.metadata.MetaDataMappingService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -100,7 +127,9 @@ import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction; import org.elasticsearch.index.seqno.RetentionLeaseSyncAction; import org.elasticsearch.index.shard.PrimaryReplicaSyncer; +import org.elasticsearch.indices.IndicesModule; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.indices.cluster.FakeThreadPoolMasterService; import org.elasticsearch.indices.cluster.IndicesClusterStateService; @@ -109,13 +138,16 @@ import org.elasticsearch.indices.mapper.MapperRegistry; import org.elasticsearch.indices.recovery.PeerRecoverySourceService; import org.elasticsearch.indices.recovery.PeerRecoveryTargetService; import org.elasticsearch.indices.recovery.RecoverySettings; -import org.elasticsearch.plugins.MapperPlugin; +import org.elasticsearch.ingest.IngestService; +import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.fetch.FetchPhase; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.disruption.DisruptableMockTransport; import org.elasticsearch.test.disruption.NetworkDisruption; @@ -138,10 +170,12 @@ import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -177,7 +211,7 @@ public class SnapshotResiliencyTests extends ESTestCase { testClusterNodes.nodes.values().forEach(TestClusterNode::stop); } - public void testSuccessfulSnapshot() { + public void testSuccessfulSnapshotAndRestore() { setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10)); String repoName = "repo"; @@ -185,10 +219,12 @@ public class SnapshotResiliencyTests extends ESTestCase { final String index = "test"; final int shards = randomIntBetween(1, 10); - + final int documents = randomIntBetween(0, 100); TestClusterNode masterNode = testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state()); final AtomicBoolean createdSnapshot = new AtomicBoolean(); + final AtomicBoolean snapshotRestored = new AtomicBoolean(); + final AtomicBoolean documentCountVerified = new AtomicBoolean(); masterNode.client.admin().cluster().preparePutRepository(repoName) .setType(FsRepository.TYPE).setSettings(Settings.builder().put("location", randomAlphaOfLength(10))) .execute( @@ -197,12 +233,61 @@ public class SnapshotResiliencyTests extends ESTestCase { new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL) .settings(defaultIndexSettings(shards)), assertNoFailureListener( - () -> masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) - .execute(assertNoFailureListener(() -> createdSnapshot.set(true))))))); - - deterministicTaskQueue.runAllRunnableTasks(); - + () -> { + final Runnable afterIndexing = () -> + masterNode.client.admin().cluster().prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true).execute(assertNoFailureListener(() -> { + createdSnapshot.set(true); + masterNode.client.admin().indices().delete( + new DeleteIndexRequest(index), + assertNoFailureListener(() -> masterNode.client.admin().cluster().restoreSnapshot( + new RestoreSnapshotRequest(repoName, snapshotName).waitForCompletion(true), + assertNoFailureListener(restoreSnapshotResponse -> { + snapshotRestored.set(true); + assertEquals(shards, restoreSnapshotResponse.getRestoreInfo().totalShards()); + masterNode.client.search( + new SearchRequest(index).source( + new SearchSourceBuilder().size(0).trackTotalHits(true) + ), + assertNoFailureListener(r -> { + assertEquals( + (long) documents, + Objects.requireNonNull(r.getHits().getTotalHits()).value + ); + documentCountVerified.set(true); + })); + }) + ))); + })); + final AtomicInteger countdown = new AtomicInteger(documents); + masterNode.client.admin().indices().putMapping( + new PutMappingRequest(index).type("_doc").source("foo", "type=text"), + assertNoFailureListener(r -> { + for (int i = 0; i < documents; ++i) { + masterNode.client.bulk( + new BulkRequest().add(new IndexRequest(index).source( + Collections.singletonMap("foo", "bar" + i))) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), + assertNoFailureListener( + bulkResponse -> { + assertFalse( + "Failures in bulkresponse: " + bulkResponse.buildFailureMessage(), + bulkResponse.hasFailures()); + if (countdown.decrementAndGet() == 0) { + afterIndexing.run(); + } + })); + } + if (documents == 0) { + afterIndexing.run(); + } + } + )); + })))); + runUntil(documentCountVerified::get, TimeUnit.MINUTES.toMillis(5L)); assertTrue(createdSnapshot.get()); + assertTrue(snapshotRestored.get()); + assertTrue(documentCountVerified.get()); SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); final Repository repository = masterNode.repositoriesService.repository(repoName); @@ -236,7 +321,6 @@ public class SnapshotResiliencyTests extends ESTestCase { .execute( assertNoFailureListener( () -> masterNode.client.admin().indices().create( - new CreateIndexRequest(index).waitForActiveShards(ActiveShardCount.ALL) .settings(defaultIndexSettings(shards)), assertNoFailureListener( @@ -833,6 +917,8 @@ public class SnapshotResiliencyTests extends ESTestCase { allocationService = ESAllocationTestCase.createAllocationService(settings); final IndexScopedSettings indexScopedSettings = new IndexScopedSettings(settings, IndexScopedSettings.BUILT_IN_INDEX_SETTINGS); + final BigArrays bigArrays = new BigArrays(new PageCacheRecycler(settings), null, "test"); + final MapperRegistry mapperRegistry = new IndicesModule(Collections.emptyList()).getMapperRegistry(); indicesService = new IndicesService( settings, mock(PluginsService.class), @@ -841,12 +927,12 @@ public class SnapshotResiliencyTests extends ESTestCase { new AnalysisRegistry(environment, emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap(), emptyMap()), indexNameExpressionResolver, - new MapperRegistry(emptyMap(), emptyMap(), MapperPlugin.NOOP_FIELD_FILTER), + mapperRegistry, namedWriteableRegistry, threadPool, indexScopedSettings, new NoneCircuitBreakerService(), - new BigArrays(new PageCacheRecycler(settings), null, "test"), + bigArrays, scriptService, client, new MetaStateService(nodeEnv, namedXContentRegistry), @@ -863,6 +949,7 @@ public class SnapshotResiliencyTests extends ESTestCase { new RoutingService(clusterService, allocationService), threadPool ); + final MetaDataMappingService metaDataMappingService = new MetaDataMappingService(clusterService, indicesService); indicesClusterStateService = new IndicesClusterStateService( settings, indicesService, @@ -870,7 +957,7 @@ public class SnapshotResiliencyTests extends ESTestCase { threadPool, new PeerRecoveryTargetService(threadPool, transportService, recoverySettings, clusterService), shardStateAction, - new NodeMappingRefreshAction(transportService, new MetaDataMappingService(clusterService, indicesService)), + new NodeMappingRefreshAction(transportService, metaDataMappingService), repositoriesService, mock(SearchService.class), new SyncedFlushService(indicesService, clusterService, transportService, indexNameExpressionResolver), @@ -915,14 +1002,61 @@ public class SnapshotResiliencyTests extends ESTestCase { actionFilters, indexNameExpressionResolver)); Map actions = new HashMap<>(); + final MetaDataCreateIndexService metaDataCreateIndexService = new MetaDataCreateIndexService(settings, clusterService, + indicesService, + allocationService, new AliasValidator(), environment, indexScopedSettings, + threadPool, namedXContentRegistry, false); actions.put(CreateIndexAction.INSTANCE, new TransportCreateIndexAction( transportService, clusterService, threadPool, - new MetaDataCreateIndexService(settings, clusterService, indicesService, - allocationService, new AliasValidator(), environment, indexScopedSettings, - threadPool, namedXContentRegistry, false), + metaDataCreateIndexService, actionFilters, indexNameExpressionResolver )); + final MappingUpdatedAction mappingUpdatedAction = new MappingUpdatedAction(settings, clusterSettings); + mappingUpdatedAction.setClient(client); + final TransportShardBulkAction transportShardBulkAction = new TransportShardBulkAction(settings, transportService, + clusterService, indicesService, threadPool, shardStateAction, mappingUpdatedAction, new UpdateHelper(scriptService), + actionFilters, indexNameExpressionResolver); + actions.put(BulkAction.INSTANCE, + new TransportBulkAction(threadPool, transportService, clusterService, + new IngestService( + clusterService, threadPool, environment, scriptService, + new AnalysisModule(environment, Collections.emptyList()).getAnalysisRegistry(), + Collections.emptyList()), + transportShardBulkAction, client, actionFilters, indexNameExpressionResolver, + new AutoCreateIndex(settings, clusterSettings, indexNameExpressionResolver) + )); + final RestoreService restoreService = new RestoreService( + clusterService, repositoriesService, allocationService, + metaDataCreateIndexService, + new MetaDataIndexUpgradeService( + settings, namedXContentRegistry, + mapperRegistry, + indexScopedSettings, + Collections.emptyList() + ), + clusterSettings + ); + actions.put(PutMappingAction.INSTANCE, + new TransportPutMappingAction(transportService, clusterService, threadPool, metaDataMappingService, + actionFilters, indexNameExpressionResolver, new TransportPutMappingAction.RequestValidators(Collections.emptyList()))); + final ResponseCollectorService responseCollectorService = new ResponseCollectorService(clusterService); + final SearchTransportService searchTransportService = new SearchTransportService(transportService, + SearchExecutionStatsCollector.makeWrapper(responseCollectorService)); + final SearchService searchService = new SearchService(clusterService, indicesService, threadPool, scriptService, + bigArrays, new FetchPhase(Collections.emptyList()), responseCollectorService); + actions.put(SearchAction.INSTANCE, + new TransportSearchAction(threadPool, transportService, searchService, + searchTransportService, new SearchPhaseController(searchService::createReduceContext), clusterService, + actionFilters, indexNameExpressionResolver)); + actions.put(RestoreSnapshotAction.INSTANCE, + new TransportRestoreSnapshotAction(transportService, clusterService, threadPool, restoreService, actionFilters, + indexNameExpressionResolver)); + actions.put(DeleteIndexAction.INSTANCE, + new TransportDeleteIndexAction( + transportService, clusterService, threadPool, + new MetaDataDeleteIndexService(settings, clusterService, allocationService), actionFilters, + indexNameExpressionResolver, new DestructiveOperations(settings, clusterSettings))); actions.put(PutRepositoryAction.INSTANCE, new TransportPutRepositoryAction( transportService, clusterService, repositoriesService, threadPool,