From af9b98e81c001ba9589823d1a84d90b556ee8022 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 8 Jul 2019 10:55:39 +0200 Subject: [PATCH] Recursively Delete Unreferenced Index Directories (#42189) (#44051) * Use ability to list child "folders" in the blob store to implement recursive delete on all stale index folders when cleaning up instead of using the diff between two `RepositoryData` instances to cover aborted deletes * Runs after ever delete operation * Relates #13159 (fixing most of this issues caused by unreferenced indices, leaving some meta files to be cleaned up only) --- .../repositories/hdfs/HdfsTests.java | 7 + .../s3/S3RepositoryThirdPartyTests.java | 16 +++ .../blobstore/BlobStoreRepository.java | 55 +++++--- .../AbstractSnapshotIntegTestCase.java | 28 ++++ .../DedicatedClusterSnapshotRestoreIT.java | 2 + ...etadataLoadingDuringSnapshotRestoreIT.java | 4 +- .../snapshots/RepositoriesIT.java | 2 + .../SharedClusterSnapshotRestoreIT.java | 26 ++-- .../snapshots/SnapshotResiliencyTests.java | 127 +---------------- .../snapshots/mockstore/MockRepository.java | 12 ++ .../AbstractThirdPartyRepositoryTestCase.java | 82 +++++++++++ .../blobstore/BlobStoreTestUtil.java | 133 ++++++++++++++++++ 12 files changed, 345 insertions(+), 149 deletions(-) create mode 100644 test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java diff --git a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java index 88454188da5..96eae1bd53f 100644 --- a/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java +++ b/plugins/repository-hdfs/src/test/java/org/elasticsearch/repositories/hdfs/HdfsTests.java @@ -27,9 +27,13 @@ import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.RepositoryException; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.util.Collection; @@ -145,6 +149,9 @@ public class HdfsTests extends ESSingleNodeTestCase { ClusterState clusterState = client.admin().cluster().prepareState().get().getState(); assertThat(clusterState.getMetaData().hasIndex("test-idx-1"), equalTo(true)); assertThat(clusterState.getMetaData().hasIndex("test-idx-2"), equalTo(false)); + final BlobStoreRepository repo = + (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo"); + BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC)); } public void testMissingUri() { diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java index bdaace00f80..7e7ac8d4300 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3RepositoryThirdPartyTests.java @@ -26,10 +26,12 @@ import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.test.StreamsUtils; import java.io.IOException; import java.util.Collection; +import java.util.concurrent.Executor; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -76,6 +78,20 @@ public class S3RepositoryThirdPartyTests extends AbstractThirdPartyRepositoryTes } @Override + protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor genericExec) throws Exception { + // S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that + // listing operations will become consistent within these 10 minutes. + assertBusy(() -> assertTrue(super.assertCorruptionVisible(repo, genericExec)), 10L, TimeUnit.MINUTES); + return true; + } + + @Override + protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception { + // S3 is only eventually consistent for the list operations used by this assertions so we retry for 10 minutes assuming that + // listing operations will become consistent within these 10 minutes. + assertBusy(() -> super.assertConsistentRepository(repo, executor), 10L, TimeUnit.MINUTES); + } + protected void assertBlobsByPrefix(BlobPath path, String prefix, Map blobs) throws Exception { // AWS S3 is eventually consistent so we retry for 10 minutes assuming a list operation will never take longer than that // to become consistent. diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 6c72dd7f844..3b31e2a7d53 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -57,7 +57,6 @@ import org.elasticsearch.common.metrics.CounterMetric; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentFactory; @@ -419,46 +418,68 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex); } // Delete snapshot from the index file, since it is the maintainer of truth of active snapshots - final RepositoryData repositoryData; final RepositoryData updatedRepositoryData; + final Map foundIndices; try { - repositoryData = getRepositoryData(); + final RepositoryData repositoryData = getRepositoryData(); updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); + // Cache the indices that were found before writing out the new index-N blob so that a stuck master will never + // delete an index that was created by another master node after writing this index-N blob. + foundIndices = blobStore().blobContainer(basePath().add("indices")).children(); writeIndexGen(updatedRepositoryData, repositoryStateId); } catch (Exception ex) { listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex)); return; } final SnapshotInfo finalSnapshotInfo = snapshot; - final Collection unreferencedIndices = Sets.newHashSet(repositoryData.getIndices().values()); - unreferencedIndices.removeAll(updatedRepositoryData.getIndices().values()); try { blobContainer().deleteBlobsIgnoringIfNotExists( Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID()))); } catch (IOException e) { logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e); } + final Map survivingIndices = updatedRepositoryData.getIndices(); deleteIndices( Optional.ofNullable(finalSnapshotInfo) - .map(info -> info.indices().stream().map(repositoryData::resolveIndexId).collect(Collectors.toList())) + .map(info -> info.indices().stream().filter(survivingIndices::containsKey) + .map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList())) .orElse(Collections.emptyList()), snapshotId, ActionListener.map(listener, v -> { - try { - blobStore().blobContainer(indicesPath()).deleteBlobsIgnoringIfNotExists( - unreferencedIndices.stream().map(IndexId::getId).collect(Collectors.toList())); - } catch (IOException e) { - logger.warn(() -> - new ParameterizedMessage( - "[{}] indices {} are no longer part of any snapshots in the repository, " + - "but failed to clean up their index folders.", metadata.name(), unreferencedIndices), e); - } + cleanupStaleIndices(foundIndices, survivingIndices); return null; }) ); } } + private void cleanupStaleIndices(Map foundIndices, Map survivingIndices) { + try { + final Set survivingIndexIds = survivingIndices.values().stream() + .map(IndexId::getId).collect(Collectors.toSet()); + for (Map.Entry indexEntry : foundIndices.entrySet()) { + final String indexSnId = indexEntry.getKey(); + try { + if (survivingIndexIds.contains(indexSnId) == false) { + logger.debug("[{}] Found stale index [{}]. Cleaning it up", metadata.name(), indexSnId); + indexEntry.getValue().delete(); + logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId); + } + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage( + "[{}] index {} is no longer part of any snapshots in the repository, " + + "but failed to clean up their index folders", metadata.name(), indexSnId), e); + } + } + } catch (Exception e) { + // TODO: We shouldn't be blanket catching and suppressing all exceptions here and instead handle them safely upstream. + // Currently this catch exists as a stop gap solution to tackle unexpected runtime exceptions from implementations + // bubbling up and breaking the snapshot functionality. + assert false : e; + logger.warn(new ParameterizedMessage("[{}] Exception during cleanup of stale indices", metadata.name()), e); + } + } + private void deleteIndices(List indices, SnapshotId snapshotId, ActionListener listener) { if (indices.isEmpty()) { listener.onResponse(null); @@ -523,9 +544,9 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp startTime, failure, System.currentTimeMillis(), totalShards, shardFailures, includeGlobalState, userMetadata); try { + final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices); snapshotFormat.write(blobStoreSnapshot, blobContainer(), snapshotId.getUUID()); - final RepositoryData repositoryData = getRepositoryData(); - writeIndexGen(repositoryData.addSnapshot(snapshotId, blobStoreSnapshot.state(), indices), repositoryStateId); + writeIndexGen(updatedRepositoryData, repositoryStateId); } catch (FileAlreadyExistsException ex) { // if another master was elected and took over finalizing the snapshot, it is possible // that both nodes try to finalize the snapshot and write to the same blobs, so we just diff --git a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index 9fe7356877c..e44d0f4e2de 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/server/src/test/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -20,11 +20,13 @@ package org.elasticsearch.snapshots; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.cluster.SnapshotsInProgress; +import org.elasticsearch.cluster.metadata.RepositoryMetaData; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; import org.junit.After; @@ -65,6 +67,32 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { internalCluster().assertConsistentHistoryBetweenTranslogAndLuceneIndex(); } + private String skipRepoConsistencyCheckReason; + + @After + public void assertRepoConsistency() { + if (skipRepoConsistencyCheckReason == null) { + client().admin().cluster().prepareGetRepositories().get().repositories() + .stream() + .map(RepositoryMetaData::name) + .forEach(name -> { + final List snapshots = client().admin().cluster().prepareGetSnapshots(name).get().getSnapshots(); + // Delete one random snapshot to trigger repository cleanup. + if (snapshots.isEmpty() == false) { + client().admin().cluster().prepareDeleteSnapshot(name, randomFrom(snapshots).snapshotId().getName()).get(); + } + BlobStoreTestUtil.assertRepoConsistency(internalCluster(), name); + }); + } else { + logger.info("--> skipped repo consistency checks because [{}]", skipRepoConsistencyCheckReason); + } + } + + protected void disableRepoConsistencyCheck(String reason) { + assertNotNull(reason); + skipRepoConsistencyCheckReason = reason; + } + public static long getFailureCount(String repository) { long failureCount = 0; for (RepositoriesService repositoriesService : diff --git a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index e1f75bb0a81..6439bc989ab 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -723,6 +723,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest } public void testRegistrationFailure() { + disableRepoConsistencyCheck("This test does not create any data in the repository"); logger.info("--> start first node"); internalCluster().startNode(); logger.info("--> start second node"); @@ -742,6 +743,7 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest } public void testThatSensitiveRepositorySettingsAreNotExposed() throws Exception { + disableRepoConsistencyCheck("This test does not create any data in the repository"); Settings nodeSettings = Settings.EMPTY; logger.info("--> start two nodes"); internalCluster().startNodes(2, nodeSettings); diff --git a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java index 040f12c9566..81302399c76 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java @@ -144,8 +144,8 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte // Deleting a snapshot does not load the global metadata state but loads each index metadata assertAcked(client().admin().cluster().prepareDeleteSnapshot("repository", "snap").get()); assertGlobalMetadataLoads("snap", 1); - assertIndexMetadataLoads("snap", "docs", 5); - assertIndexMetadataLoads("snap", "others", 4); + assertIndexMetadataLoads("snap", "docs", 4); + assertIndexMetadataLoads("snap", "others", 3); } private void assertGlobalMetadataLoads(final String snapshot, final int times) { diff --git a/server/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java b/server/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java index 63e2c31ea3e..455a3f18e54 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/RepositoriesIT.java @@ -184,6 +184,8 @@ public class RepositoriesIT extends AbstractSnapshotIntegTestCase { } public void testRepositoryVerification() throws Exception { + disableRepoConsistencyCheck("This test does not create any data in the repository."); + Client client = client(); Settings settings = Settings.builder() diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index ec0106de5ef..7cba57fd2d5 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -363,7 +363,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(client.prepareGet(restoredIndexName, typeName, docId).get().isExists(), equalTo(true)); } - public void testFreshIndexUUID() { + public void testFreshIndexUUID() throws InterruptedException { Client client = client(); logger.info("--> creating repository"); @@ -541,7 +541,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> check that aliases are not restored and existing aliases still exist"); assertAliasesMissing(client.admin().indices().prepareAliasesExist("alias-123", "alias-1").get()); assertAliasesExist(client.admin().indices().prepareAliasesExist("alias-3").get()); - } public void testRestoreTemplates() throws Exception { @@ -594,7 +593,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas logger.info("--> check that template is restored"); getIndexTemplatesResponse = client().admin().indices().prepareGetTemplates().get(); assertIndexTemplateExists(getIndexTemplatesResponse, "test-template"); - } public void testIncludeGlobalState() throws Exception { @@ -781,10 +779,10 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertFalse(client().admin().cluster().prepareGetPipeline("barbaz").get().isFound()); assertNull(client().admin().cluster().prepareGetStoredScript("foobar").get().getSource()); assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits().value, equalTo(100L)); - } - public void testSnapshotFileFailureDuringSnapshot() { + public void testSnapshotFileFailureDuringSnapshot() throws InterruptedException { + disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks"); Client client = client(); logger.info("--> creating repository"); @@ -911,6 +909,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } public void testDataFileFailureDuringRestore() throws Exception { + disableRepoConsistencyCheck("This test intentionally leaves a broken repository"); + Path repositoryLocation = randomRepoPath(); Client client = client(); logger.info("--> creating repository"); @@ -974,6 +974,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } public void testDataFileCorruptionDuringRestore() throws Exception { + disableRepoConsistencyCheck("This test intentionally leaves a broken repository"); + Path repositoryLocation = randomRepoPath(); Client client = client(); logger.info("--> creating repository"); @@ -1238,7 +1240,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertThat(restoreSnapshotResponse.getRestoreInfo().failedShards(), equalTo(0)); SearchResponse countResponse = client.prepareSearch("test-idx").setSize(0).get(); assertThat(countResponse.getHits().getTotalHits().value, equalTo(100L)); - } public void testUnallocatedShards() throws Exception { @@ -1703,8 +1704,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas .setIndices("test-idx-1").setRenamePattern("test-idx").setRenameReplacement("alias") .setWaitForCompletion(true).setIncludeAliases(false).execute().actionGet(); assertThat(restoreSnapshotResponse.getRestoreInfo().totalShards(), greaterThan(0)); - - } public void testMoveShardWhileSnapshotting() throws Exception { @@ -1771,6 +1770,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } public void testDeleteRepositoryWhileSnapshotting() throws Exception { + disableRepoConsistencyCheck("This test uses a purposely broken repository so it would fail consistency checks"); Client client = client(); Path repositoryLocation = randomRepoPath(); logger.info("--> creating repository"); @@ -2329,7 +2329,6 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "Foo")).get(), numdocs); assertHitCount(client.prepareSearch("test-idx").setSize(0).setQuery(matchQuery("field1", "bar")).get(), numdocs); - } public void testRecreateBlocksOnRestore() throws Exception { @@ -2423,6 +2422,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } public void testCloseOrDeleteIndexDuringSnapshot() throws Exception { + disableRepoConsistencyCheck("This test intentionally leaves a broken repository"); + Client client = client(); boolean allowPartial = randomBoolean(); @@ -2747,6 +2748,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } public void testSnapshotName() throws Exception { + disableRepoConsistencyCheck("This test does not create any data in the repository"); + final Client client = client(); logger.info("--> creating repository"); @@ -2767,6 +2770,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } public void testListCorruptedSnapshot() throws Exception { + disableRepoConsistencyCheck("This test intentionally leaves a broken repository"); + Client client = client(); Path repo = randomRepoPath(); logger.info("--> creating repository at {}", repo.toAbsolutePath()); @@ -3336,6 +3341,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas } public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception { + // TODO: Fix repo cleanup logic to handle these leaked snap-file and only exclude test-repo (the mock repo) here. + disableRepoConsistencyCheck( + "This test uses a purposely broken repository implementation that results in leaking snap-{uuid}.dat files"); logger.info("--> creating repository"); final Path repoPath = randomRepoPath(); final Client client = client(); diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index 744248d034d..21b05879cac 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -108,8 +108,6 @@ import org.elasticsearch.cluster.routing.allocation.command.AllocateEmptyPrimary import org.elasticsearch.cluster.service.ClusterApplierService; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.cluster.service.MasterService; -import org.elasticsearch.common.Strings; -import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.network.NetworkModule; import org.elasticsearch.common.settings.ClusterSettings; @@ -120,11 +118,7 @@ import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; -import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentHelper; -import org.elasticsearch.common.xcontent.XContentParser; -import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; @@ -149,10 +143,10 @@ import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.ingest.IngestService; import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.plugins.PluginsService; -import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; -import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.repositories.fs.FsRepository; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; @@ -170,16 +164,8 @@ import org.elasticsearch.transport.TransportService; import org.junit.After; import org.junit.Before; -import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.file.DirectoryNotEmptyException; -import java.nio.file.FileVisitResult; -import java.nio.file.Files; -import java.nio.file.NoSuchFileException; import java.nio.file.Path; -import java.nio.file.SimpleFileVisitor; -import java.nio.file.attribute.BasicFileAttributes; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -225,9 +211,11 @@ public class SnapshotResiliencyTests extends ESTestCase { } @After - public void verifyReposThenStopServices() throws IOException { + public void verifyReposThenStopServices() { try { - assertNoStaleRepositoryData(); + BlobStoreTestUtil.assertConsistency( + (BlobStoreRepository) testClusterNodes.randomMasterNodeSafe().repositoriesService.repository("repo"), + Runnable::run); } finally { testClusterNodes.nodes.values().forEach(TestClusterNode::stop); } @@ -525,109 +513,6 @@ public class SnapshotResiliencyTests extends ESTestCase { assertThat(snapshotIds, either(hasSize(1)).or(hasSize(0))); } - /** - * Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository. - * TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata - */ - private void assertNoStaleRepositoryData() throws IOException { - final Path repoPath = tempDir.resolve("repo").toAbsolutePath(); - final List repos; - try (Stream reposDir = repoFilesByPrefix(repoPath)) { - repos = reposDir.filter(s -> s.getFileName().toString().startsWith("extra") == false).collect(Collectors.toList()); - } - for (Path repoRoot : repos) { - cleanupEmptyTrees(repoRoot); - final Path latestIndexGenBlob = repoRoot.resolve("index.latest"); - assertTrue("Could not find index.latest blob for repo at [" + repoRoot + ']', Files.exists(latestIndexGenBlob)); - final long latestGen = ByteBuffer.wrap(Files.readAllBytes(latestIndexGenBlob)).getLong(0); - assertIndexGenerations(repoRoot, latestGen); - final RepositoryData repositoryData; - try (XContentParser parser = - XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, - new BytesArray(Files.readAllBytes(repoRoot.resolve("index-" + latestGen))), XContentType.JSON)) { - repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); - } - assertIndexUUIDs(repoRoot, repositoryData); - assertSnapshotUUIDs(repoRoot, repositoryData); - } - } - - // Lucene's mock file system randomly generates empty `extra0` files that break the deletion of blob-store directories. - // We clean those up here before checking a blob-store for stale files. - private void cleanupEmptyTrees(Path repoPath) { - try { - Files.walkFileTree(repoPath, new SimpleFileVisitor() { - - @Override - public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { - if (file.getFileName().toString().startsWith("extra")) { - Files.delete(file); - } - return FileVisitResult.CONTINUE; - } - - @Override - public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { - try { - Files.delete(dir); - } catch (DirectoryNotEmptyException e) { - // We're only interested in deleting empty trees here, just ignore directories with content - } - return FileVisitResult.CONTINUE; - } - }); - } catch (IOException e) { - throw new AssertionError(e); - } - } - - private static void assertIndexGenerations(Path repoRoot, long latestGen) throws IOException { - try (Stream repoRootBlobs = repoFilesByPrefix(repoRoot)) { - final long[] indexGenerations = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith("index-")) - .map(p -> p.getFileName().toString().replace("index-", "")) - .mapToLong(Long::parseLong).sorted().toArray(); - assertEquals(latestGen, indexGenerations[indexGenerations.length - 1]); - assertTrue(indexGenerations.length <= 2); - } - } - - private static void assertIndexUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException { - final List expectedIndexUUIDs = - repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList()); - try (Stream indexRoots = repoFilesByPrefix(repoRoot.resolve("indices"))) { - final List foundIndexUUIDs = indexRoots.filter(s -> s.getFileName().toString().startsWith("extra") == false) - .map(p -> p.getFileName().toString()).collect(Collectors.toList()); - assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY))); - } - } - - private static void assertSnapshotUUIDs(Path repoRoot, RepositoryData repositoryData) throws IOException { - final List expectedSnapshotUUIDs = - repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList()); - for (String prefix : new String[]{"snap-", "meta-"}) { - try (Stream repoRootBlobs = repoFilesByPrefix(repoRoot)) { - final Collection foundSnapshotUUIDs = repoRootBlobs.filter(p -> p.getFileName().toString().startsWith(prefix)) - .map(p -> p.getFileName().toString().replace(prefix, "").replace(".dat", "")) - .collect(Collectors.toSet()); - assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY))); - } - } - } - - /** - * List contents of a blob path and return an empty stream if the path doesn't exist. - * @param prefix Path to find children for - * @return stream of child paths - * @throws IOException on failure - */ - private static Stream repoFilesByPrefix(Path prefix) throws IOException { - try { - return Files.list(prefix); - } catch (FileNotFoundException | NoSuchFileException e) { - return Stream.empty(); - } - } - private void clearDisruptionsAndAwaitSync() { testClusterNodes.clearNetworkDisruptions(); runUntil(() -> { diff --git a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java index 6879406d13a..21ade636f53 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java +++ b/server/src/test/java/org/elasticsearch/snapshots/mockstore/MockRepository.java @@ -59,6 +59,7 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; public class MockRepository extends FsRepository { private static final Logger logger = LogManager.getLogger(MockRepository.class); @@ -333,6 +334,17 @@ public class MockRepository extends FsRepository { super.deleteBlobIgnoringIfNotExists(blobName); } + @Override + public void delete() throws IOException { + for (BlobContainer child : children().values()) { + child.delete(); + } + for (String blob : listBlobs().values().stream().map(BlobMetaData::name).collect(Collectors.toList())) { + deleteBlobIgnoringIfNotExists(blob); + } + blobStore().blobContainer(path().parent()).deleteBlob(path().toArray()[path().toArray().length - 1]); + } + @Override public Map listBlobs() throws IOException { maybeIOExceptionOrBlock(""); diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java index 28083f49e1a..e32c3945673 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/AbstractThirdPartyRepositoryTestCase.java @@ -20,6 +20,7 @@ package org.elasticsearch.repositories; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse; +import org.elasticsearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.common.Strings; import org.elasticsearch.common.blobstore.BlobMetaData; @@ -29,8 +30,10 @@ import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; import org.elasticsearch.common.settings.SecureSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.repositories.blobstore.BlobStoreRepository; +import org.elasticsearch.repositories.blobstore.BlobStoreTestUtil; import org.elasticsearch.snapshots.SnapshotState; import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.threadpool.ThreadPool; import java.io.ByteArrayInputStream; import java.util.Arrays; @@ -184,6 +187,85 @@ public abstract class AbstractThirdPartyRepositoryTestCase extends ESSingleNodeT } } + public void testCleanup() throws Exception { + createRepository("test-repo"); + + createIndex("test-idx-1"); + createIndex("test-idx-2"); + createIndex("test-idx-3"); + ensureGreen(); + + logger.info("--> indexing some data"); + for (int i = 0; i < 100; i++) { + client().prepareIndex("test-idx-1", "doc", Integer.toString(i)).setSource("foo", "bar" + i).get(); + client().prepareIndex("test-idx-2", "doc", Integer.toString(i)).setSource("foo", "bar" + i).get(); + client().prepareIndex("test-idx-3", "doc", Integer.toString(i)).setSource("foo", "bar" + i).get(); + } + client().admin().indices().prepareRefresh().get(); + + final String snapshotName = "test-snap-" + System.currentTimeMillis(); + + logger.info("--> snapshot"); + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot("test-repo", snapshotName) + .setWaitForCompletion(true) + .setIndices("test-idx-*", "-test-idx-3") + .get(); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), greaterThan(0)); + assertThat(createSnapshotResponse.getSnapshotInfo().successfulShards(), + equalTo(createSnapshotResponse.getSnapshotInfo().totalShards())); + + assertThat(client().admin() + .cluster() + .prepareGetSnapshots("test-repo") + .setSnapshots(snapshotName) + .get() + .getSnapshots().get(0) + .state(), + equalTo(SnapshotState.SUCCESS)); + + logger.info("--> creating a dangling index folder"); + final BlobStoreRepository repo = + (BlobStoreRepository) getInstanceFromNode(RepositoriesService.class).repository("test-repo"); + final PlainActionFuture future = PlainActionFuture.newFuture(); + final Executor genericExec = repo.threadPool().executor(ThreadPool.Names.GENERIC); + genericExec.execute(new ActionRunnable(future) { + @Override + protected void doRun() throws Exception { + final BlobStore blobStore = repo.blobStore(); + blobStore.blobContainer(BlobPath.cleanPath().add("indices").add("foo")) + .writeBlob("bar", new ByteArrayInputStream(new byte[0]), 0, false); + future.onResponse(null); + } + }); + future.actionGet(); + assertTrue(assertCorruptionVisible(repo, genericExec)); + logger.info("--> deleting a snapshot to trigger repository cleanup"); + client().admin().cluster().deleteSnapshot(new DeleteSnapshotRequest("test-repo", snapshotName)).actionGet(); + + assertConsistentRepository(repo, genericExec); + } + + protected boolean assertCorruptionVisible(BlobStoreRepository repo, Executor executor) throws Exception { + final PlainActionFuture future = PlainActionFuture.newFuture(); + executor.execute(new ActionRunnable(future) { + @Override + protected void doRun() throws Exception { + final BlobStore blobStore = repo.blobStore(); + future.onResponse( + blobStore.blobContainer(BlobPath.cleanPath().add("indices")).children().containsKey("foo") + && blobStore.blobContainer(BlobPath.cleanPath().add("indices").add("foo")).blobExists("bar") + ); + } + }); + return future.actionGet(); + } + + protected void assertConsistentRepository(BlobStoreRepository repo, Executor executor) throws Exception { + BlobStoreTestUtil.assertConsistency(repo, executor); + } + protected void assertDeleted(BlobPath path, String name) throws Exception { assertThat(listChildren(path), not(contains(name))); } diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java new file mode 100644 index 00000000000..d75345bf718 --- /dev/null +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -0,0 +1,133 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.repositories.blobstore; + +import org.elasticsearch.action.ActionRunnable; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.blobstore.BlobContainer; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.internal.io.Streams; +import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.RepositoriesService; +import org.elasticsearch.repositories.RepositoryData; +import org.elasticsearch.snapshots.SnapshotId; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.threadpool.ThreadPool; + +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +public final class BlobStoreTestUtil { + + public static void assertRepoConsistency(InternalTestCluster testCluster, String repoName) { + final BlobStoreRepository repo = + (BlobStoreRepository) testCluster.getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName); + BlobStoreTestUtil.assertConsistency(repo, repo.threadPool().executor(ThreadPool.Names.GENERIC)); + } + + /** + * Assert that there are no unreferenced indices or unreferenced root-level metadata blobs in any repository. + * TODO: Expand the logic here to also check for unreferenced segment blobs and shard level metadata + * @param repository BlobStoreRepository to check + * @param executor Executor to run all repository calls on. This is needed since the production {@link BlobStoreRepository} + * implementations assert that all IO inducing calls happen on the generic or snapshot thread-pools and hence callers + * of this assertion must pass an executor on those when using such an implementation. + */ + public static void assertConsistency(BlobStoreRepository repository, Executor executor) { + final PlainActionFuture listener = PlainActionFuture.newFuture(); + executor.execute(new ActionRunnable(listener) { + @Override + protected void doRun() throws Exception { + final BlobContainer blobContainer = repository.blobContainer(); + assertTrue( + "Could not find index.latest blob for repo [" + repository + "]", blobContainer.blobExists("index.latest")); + final long latestGen; + try (DataInputStream inputStream = new DataInputStream(blobContainer.readBlob("index.latest"))) { + latestGen = inputStream.readLong(); + } + assertIndexGenerations(blobContainer, latestGen); + final RepositoryData repositoryData; + try (InputStream inputStream = blobContainer.readBlob("index-" + latestGen); + BytesStreamOutput out = new BytesStreamOutput()) { + Streams.copy(inputStream, out); + try (XContentParser parser = + XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, + out.bytes(), XContentType.JSON)) { + repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen); + } + } + assertIndexUUIDs(blobContainer, repositoryData); + assertSnapshotUUIDs(blobContainer, repositoryData); + listener.onResponse(null); + } + }); + listener.actionGet(TimeValue.timeValueMinutes(1L)); + } + + private static void assertIndexGenerations(BlobContainer repoRoot, long latestGen) throws IOException { + final long[] indexGenerations = repoRoot.listBlobsByPrefix("index-").keySet().stream() + .map(s -> s.replace("index-", "")) + .mapToLong(Long::parseLong).sorted().toArray(); + assertEquals(latestGen, indexGenerations[indexGenerations.length - 1]); + assertTrue(indexGenerations.length <= 2); + } + + private static void assertIndexUUIDs(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException { + final List expectedIndexUUIDs = + repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList()); + final BlobContainer indicesContainer = repoRoot.children().get("indices"); + final List foundIndexUUIDs; + if (indicesContainer == null) { + foundIndexUUIDs = Collections.emptyList(); + } else { + foundIndexUUIDs = indicesContainer.children().keySet().stream().filter( + s -> s.startsWith("extra") == false).collect(Collectors.toList()); + } + assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY))); + } + + private static void assertSnapshotUUIDs(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException { + final List expectedSnapshotUUIDs = + repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toList()); + for (String prefix : new String[]{"snap-", "meta-"}) { + final Collection foundSnapshotUUIDs = repoRoot.listBlobs().keySet().stream().filter(p -> p.startsWith(prefix)) + .map(p -> p.replace(prefix, "").replace(".dat", "")) + .collect(Collectors.toSet()); + assertThat(foundSnapshotUUIDs, containsInAnyOrder(expectedSnapshotUUIDs.toArray(Strings.EMPTY_ARRAY))); + } + } +}