From d456f7870a7cc001517793b96fcb1a118205b337 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 14 Jul 2020 22:18:42 +0200 Subject: [PATCH] Deduplicate Index Metadata in BlobStore (#50278) (#59514) This PR introduces two new fields in to `RepositoryData` (index-N) to track the blob name of `IndexMetaData` blobs and their content via setting generations and uuids. This is used to deduplicate the `IndexMetaData` blobs (`meta-{uuid}.dat` in the indices folders under `/indices` so that new metadata for an index is only written to the repository during a snapshot if that same metadata can't be found in another snapshot. This saves one write per index in the common case of unchanged metadata thus saving cost and making snapshot finalization drastically faster if many indices are being snapshotted at the same time. The implementation is mostly analogous to that for shard generations in #46250 and piggy backs on the BwC mechanism introduced in that PR (which means this PR needs adjustments if it doesn't go into `7.6`). Relates to #45736 as it improves the efficiency of snapshotting unchanged indices Relates to #49800 as it has the potential of loading the index metadata for multiple snapshots of the same index concurrently much more efficient speeding up future concurrent snapshot delete --- .../s3/S3BlobStoreRepositoryTests.java | 4 +- .../MultiVersionRepositoryAccessIT.java | 2 +- .../CorruptedBlobStoreRepositoryIT.java | 13 +- .../DedicatedClusterSnapshotRestoreIT.java | 83 ++++++++ ...etadataLoadingDuringSnapshotRestoreIT.java | 7 +- .../SharedClusterSnapshotRestoreIT.java | 3 +- .../TransportSnapshotsStatusAction.java | 2 +- .../repositories/FilterRepository.java | 4 +- .../IndexMetaDataGenerations.java | 177 ++++++++++++++++ .../repositories/Repository.java | 3 +- .../repositories/RepositoryData.java | 120 +++++++++-- .../blobstore/BlobStoreRepository.java | 189 +++++++++++------- .../snapshots/RestoreService.java | 2 +- .../snapshots/SnapshotsService.java | 13 +- .../RepositoriesServiceTests.java | 3 +- .../repositories/RepositoryDataTests.java | 95 +++++++-- .../blobstore/BlobStoreRepositoryTests.java | 13 +- .../snapshots/SnapshotResiliencyTests.java | 2 +- .../index/shard/RestoreOnlyRepository.java | 6 +- .../blobstore/BlobStoreTestUtil.java | 27 ++- .../AbstractSnapshotIntegTestCase.java | 6 +- .../xpack/ccr/repository/CcrRepository.java | 6 +- .../SourceOnlySnapshotShardTests.java | 3 +- 23 files changed, 644 insertions(+), 139 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java diff --git a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java index e240485fe62..fb8215294e1 100644 --- a/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java +++ b/plugins/repository-s3/src/test/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java @@ -160,8 +160,8 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes final RepositoryData repositoryData = getRepositoryData(repository); final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion())); - final BytesReference serialized = - BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false)); + final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), + SnapshotsService.OLD_SNAPSHOT_FORMAT)); PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> { try (InputStream stream = serialized.streamInput()) { repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic( diff --git a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java index f3b02c71578..616b19345c2 100644 --- a/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java +++ b/qa/repository-multi-version/src/test/java/org/elasticsearch/upgrades/MultiVersionRepositoryAccessIT.java @@ -218,7 +218,7 @@ public class MultiVersionRepositoryAccessIT extends ESRestTestCase { ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards); } } else { - if (SnapshotsService.useShardGenerations(minimumNodeVersion()) == false) { + if (SnapshotsService.useIndexGenerations(minimumNodeVersion()) == false) { assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER)); final List> expectedExceptions = Arrays.asList(ResponseException.class, ElasticsearchStatusException.class); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java index d4f3d117732..c12c0b5d17c 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/CorruptedBlobStoreRepositoryIT.java @@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; @@ -273,11 +274,12 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas SnapshotId::getUUID, Function.identity())), repositoryData.getSnapshotIds().stream().collect(Collectors.toMap( SnapshotId::getUUID, repositoryData::getSnapshotState)), - Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); + Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); Files.write(repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()), - BytesReference.toBytes(BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), - true))), StandardOpenOption.TRUNCATE_EXISTING); + BytesReference.toBytes(BytesReference.bytes( + withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))), + StandardOpenOption.TRUNCATE_EXISTING); logger.info("--> verify that repo is assumed in old metadata format"); final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class); @@ -403,11 +405,12 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)), repositoryData1.getIndices().values().stream().collect( Collectors.toMap(Function.identity(), repositoryData1::getSnapshots) - ), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build() + ), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build(), + repositoryData1.indexMetaDataGenerations() ); Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()), BytesReference.toBytes(BytesReference.bytes( - brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), true))), + brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))), StandardOpenOption.TRUNCATE_EXISTING); logger.info("--> recreating repository to clear caches"); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java index 7e33d11d1db..71ce2cfe4f4 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/DedicatedClusterSnapshotRestoreIT.java @@ -73,9 +73,11 @@ import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.node.Node; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.RepositoryMissingException; +import org.elasticsearch.repositories.blobstore.BlobStoreRepository; import org.elasticsearch.rest.AbstractRestChannel; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction; import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction; import org.elasticsearch.snapshots.mockstore.MockRepository; @@ -996,6 +998,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest SnapshotStats stats = snapshots.get(0).getStats(); + final List snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount)); assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize)); @@ -1023,6 +1027,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest .get(); final List snapshot1Files = scanSnapshotFolder(repoPath); + final List snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath); + + // The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository + assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles)); final int snapshot1FileCount = snapshot1Files.size(); final long snapshot1FileSize = calculateTotalFilesSize(snapshot1Files); @@ -1047,6 +1055,65 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize)); } + public void testDeduplicateIndexMetadata() throws Exception { + final String indexName = "test-blocks-1"; + final String repositoryName = "repo-" + indexName; + final String snapshot0 = "snapshot-0"; + final String snapshot1 = "snapshot-1"; + final String snapshot2 = "snapshot-2"; + + createIndex(indexName); + + int docs = between(10, 100); + for (int i = 0; i < docs; i++) { + client().prepareIndex(indexName, "_doc").setSource("test", "init").execute().actionGet(); + } + + final Path repoPath = randomRepoPath(); + createRepository(repositoryName, "fs", repoPath); + + logger.info("--> create a snapshot"); + client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0) + .setIncludeGlobalState(true) + .setWaitForCompletion(true) + .get(); + + final List snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index + + docs = between(1, 5); + for (int i = 0; i < docs; i++) { + client().prepareIndex(indexName, "_doc").setSource("test", "test" + i).execute().actionGet(); + } + + logger.info("--> restart random data node and add new data node to change index allocation"); + internalCluster().restartRandomDataNode(); + internalCluster().startDataOnlyNode(); + ensureGreen(indexName); + + assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot1).setWaitForCompletion(true).get().status(), + equalTo(RestStatus.OK)); + + final List snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath); + + // The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository + assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles)); + + // index to some other field to trigger a change in index metadata + for (int i = 0; i < docs; i++) { + client().prepareIndex(indexName, "_doc").setSource("new_field", "test" + i).execute().actionGet(); + } + assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2).setWaitForCompletion(true).get().status(), + equalTo(RestStatus.OK)); + + final List snapshot2IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot2IndexMetaFiles, hasSize(2)); // should have created one new metadata blob + + assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get()); + final List snapshot3IndexMetaFiles = findRepoMetaBlobs(repoPath); + assertThat(snapshot3IndexMetaFiles, hasSize(1)); // should have deleted the metadata blob referenced by the first two snapshots + } + public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception { logger.info("--> starting a master node and two data nodes"); internalCluster().startMasterOnlyNode(); @@ -1256,6 +1323,22 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest }).sum(); } + private static List findRepoMetaBlobs(Path repoPath) throws IOException { + List files = new ArrayList<>(); + Files.walkFileTree(repoPath.resolve("indices"), new SimpleFileVisitor() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + final String fileName = file.getFileName().toString(); + if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) { + files.add(file); + } + return super.visitFile(file, attrs); + } + } + ); + return files; + } + private List scanSnapshotFolder(Path repoPath) throws IOException { List files = new ArrayList<>(); Files.walkFileTree(repoPath, new SimpleFileVisitor(){ diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java index c12a0b9b764..2850eb777dc 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/MetadataLoadingDuringSnapshotRestoreIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse; import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse; import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse; +import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.RepositoryMetadata; @@ -34,6 +35,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.repositories.IndexId; import org.elasticsearch.repositories.RepositoriesService; import org.elasticsearch.repositories.Repository; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.snapshots.mockstore.MockRepository; @@ -198,9 +200,10 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId indexId) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, + IndexId indexId) throws IOException { indicesMetadata.computeIfAbsent(key(snapshotId.getName(), indexId.getName()), (s) -> new AtomicInteger(0)).incrementAndGet(); - return super.getSnapshotIndexMetadata(snapshotId, indexId); + return super.getSnapshotIndexMetaData(PlainActionFuture.get(this::getRepositoryData), snapshotId, indexId); } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 2666b243419..7234c192209 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -2546,7 +2546,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas final IndexId corruptedIndex = randomFrom(indexIds.values()); final Path indexMetadataPath = repo.resolve("indices") .resolve(corruptedIndex.getId()) - .resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat"); + .resolve( + "meta-" + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotInfo.snapshotId(), corruptedIndex) + ".dat"); // Truncate the index metadata file try(SeekableByteChannel outChan = Files.newByteChannel(indexMetadataPath, StandardOpenOption.WRITE)) { diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index c9c4a469638..e2dea018b4f 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -334,7 +334,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction shardStatus = new HashMap<>(); for (String index : snapshotInfo.indices()) { IndexId indexId = repositoryData.resolveIndexId(index); - IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), indexId); + IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId); if (indexMetadata != null) { int numberOfShards = indexMetadata.getNumberOfShards(); for (int i = 0; i < numberOfShards; i++) { diff --git a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java index b6ac4958975..588b85f23d5 100644 --- a/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/FilterRepository.java @@ -70,8 +70,8 @@ public class FilterRepository implements Repository { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { - return in.getSnapshotIndexMetadata(snapshotId, index); + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException { + return in.getSnapshotIndexMetaData(repositoryData, snapshotId, index); } @Override diff --git a/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java b/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java new file mode 100644 index 00000000000..bc1b6ae8b43 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/repositories/IndexMetaDataGenerations.java @@ -0,0 +1,177 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.repositories; + +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.snapshots.SnapshotId; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; + +/** + * Tracks the blob uuids of blobs containing {@link IndexMetadata} for snapshots as well an identifier for each of these blobs. + * Before writing a new {@link IndexMetadata} blob during snapshot finalization in + * {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#finalizeSnapshot} the identifier for an instance of + * {@link IndexMetadata} should be computed and then used to check if it already exists in the repository via + * {@link #getIndexMetaBlobId(String)}. + */ +public final class IndexMetaDataGenerations { + + public static final IndexMetaDataGenerations EMPTY = new IndexMetaDataGenerations(Collections.emptyMap(), Collections.emptyMap()); + + /** + * Map of {@link SnapshotId} to a map of the indices in a snapshot mapping {@link IndexId} to metadata identifiers. + * The identifiers in the nested map can be mapped to the relevant blob uuid via {@link #getIndexMetaBlobId}. + */ + final Map> lookup; + + /** + * Map of index metadata identifier to blob uuid. + */ + final Map identifiers; + + IndexMetaDataGenerations(Map> lookup, Map identifiers) { + assert identifiers.keySet().equals(lookup.values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toSet())) : + "identifier mappings " + identifiers + " don't track the same blob ids as the lookup map " + lookup; + assert lookup.values().stream().noneMatch(Map::isEmpty) : "Lookup contained empty map [" + lookup + "]"; + this.lookup = Collections.unmodifiableMap(lookup); + this.identifiers = Collections.unmodifiableMap(identifiers); + } + + public boolean isEmpty() { + return identifiers.isEmpty(); + } + + /** + * Gets the blob id by the identifier of {@link org.elasticsearch.cluster.metadata.IndexMetadata} + * (computed via {@link #buildUniqueIdentifier}) or {@code null} if none is tracked for the identifier. + * + * @param metaIdentifier identifier for {@link IndexMetadata} + * @return blob id for the given metadata identifier or {@code null} if the identifier is not part of the repository yet + */ + @Nullable + public String getIndexMetaBlobId(String metaIdentifier) { + return identifiers.get(metaIdentifier); + } + + /** + * Get the blob id by {@link SnapshotId} and {@link IndexId} and fall back to the value of {@link SnapshotId#getUUID()} if none is + * known to enable backwards compatibility with versions older than + * {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} which used the snapshot uuid as index metadata + * blob uuid. + * + * @param snapshotId Snapshot Id + * @param indexId Index Id + * @return blob id for the given index metadata + */ + public String indexMetaBlobId(SnapshotId snapshotId, IndexId indexId) { + final String identifier = lookup.getOrDefault(snapshotId, Collections.emptyMap()).get(indexId); + if (identifier == null) { + return snapshotId.getUUID(); + } else { + return identifiers.get(identifier); + } + } + + /** + * Create a new instance with the given snapshot and index metadata uuids and identifiers added. + * + * @param snapshotId SnapshotId + * @param newLookup new mappings of index + snapshot to index metadata identifier + * @param newIdentifiers new mappings of index metadata identifier to blob id + * @return instance with added snapshot + */ + public IndexMetaDataGenerations withAddedSnapshot(SnapshotId snapshotId, Map newLookup, + Map newIdentifiers) { + final Map> updatedIndexMetaLookup = new HashMap<>(this.lookup); + final Map updatedIndexMetaIdentifiers = new HashMap<>(identifiers); + updatedIndexMetaIdentifiers.putAll(newIdentifiers); + updatedIndexMetaLookup.compute(snapshotId, (snId, lookup) -> { + if (lookup == null) { + if (newLookup.isEmpty()) { + return null; + } + return Collections.unmodifiableMap(new HashMap<>(newLookup)); + } else { + final Map updated = new HashMap<>(lookup); + updated.putAll(newLookup); + return Collections.unmodifiableMap(updated); + } + }); + return new IndexMetaDataGenerations(updatedIndexMetaLookup, updatedIndexMetaIdentifiers); + } + + /** + * Create a new instance with the given snapshot removed. + * + * @param snapshotIds SnapshotIds to remove + * @return new instance without the given snapshot + */ + public IndexMetaDataGenerations withRemovedSnapshots(Collection snapshotIds) { + final Map> updatedIndexMetaLookup = new HashMap<>(lookup); + updatedIndexMetaLookup.keySet().removeAll(snapshotIds); + final Map updatedIndexMetaIdentifiers = new HashMap<>(identifiers); + updatedIndexMetaIdentifiers.keySet().removeIf( + k -> updatedIndexMetaLookup.values().stream().noneMatch(identifiers -> identifiers.containsValue(k))); + return new IndexMetaDataGenerations(updatedIndexMetaLookup, updatedIndexMetaIdentifiers); + } + + @Override + public int hashCode() { + return Objects.hash(identifiers, lookup); + } + + @Override + public boolean equals(Object that) { + if (this == that) { + return true; + } + if (that instanceof IndexMetaDataGenerations == false) { + return false; + } + final IndexMetaDataGenerations other = (IndexMetaDataGenerations) that; + return lookup.equals(other.lookup) && identifiers.equals(other.identifiers); + } + + @Override + public String toString() { + return "IndexMetaDataGenerations{lookup:" + lookup + "}{identifier:" + identifiers + "}"; + } + + /** + * Compute identifier for {@link IndexMetadata} from its index- and history-uuid as well as its settings-, mapping- and alias-version. + * If an index did not see a change in its settings, mappings or aliases between two points in time then the identifier will not change + * between them either. + * + * @param indexMetaData IndexMetaData + * @return identifier string + */ + public static String buildUniqueIdentifier(IndexMetadata indexMetaData) { + return indexMetaData.getIndexUUID() + + "-" + indexMetaData.getSettings().get(IndexMetadata.SETTING_HISTORY_UUID, IndexMetadata.INDEX_UUID_NA_VALUE) + + "-" + indexMetaData.getSettingsVersion() + "-" + indexMetaData.getMappingVersion() + + "-" + indexMetaData.getAliasesVersion(); + } +} diff --git a/server/src/main/java/org/elasticsearch/repositories/Repository.java b/server/src/main/java/org/elasticsearch/repositories/Repository.java index 1fb3bca5fc6..727ee7eef10 100644 --- a/server/src/main/java/org/elasticsearch/repositories/Repository.java +++ b/server/src/main/java/org/elasticsearch/repositories/Repository.java @@ -102,11 +102,12 @@ public interface Repository extends LifecycleComponent { /** * Returns the index metadata associated with the snapshot. * + * @param repositoryData current {@link RepositoryData} * @param snapshotId the snapshot id to load the index metadata from * @param index the {@link IndexId} to load the metadata from * @return the index metadata about the given index for the given snapshot */ - IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException; + IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException; /** * Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots diff --git a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java index 6e4e4911d6d..fd98287376d 100644 --- a/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java +++ b/server/src/main/java/org/elasticsearch/repositories/RepositoryData.java @@ -40,6 +40,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -67,8 +68,8 @@ public final class RepositoryData { /** * An instance initialized for an empty repository. */ - public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN, - Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); + public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); /** * The generational id of the index file from which the repository data was read. @@ -93,6 +94,11 @@ public final class RepositoryData { private final Map snapshotVersions; + /** + * Index metadata generations. + */ + private final IndexMetaDataGenerations indexMetaDataGenerations; + /** * Shard generations. */ @@ -100,7 +106,7 @@ public final class RepositoryData { public RepositoryData(long genId, Map snapshotIds, Map snapshotStates, Map snapshotVersions, Map> indexSnapshots, - ShardGenerations shardGenerations) { + ShardGenerations shardGenerations, IndexMetaDataGenerations indexMetaDataGenerations) { this.genId = genId; this.snapshotIds = Collections.unmodifiableMap(snapshotIds); this.snapshotStates = Collections.unmodifiableMap(snapshotStates); @@ -108,6 +114,7 @@ public final class RepositoryData { .collect(Collectors.toMap(IndexId::getName, Function.identity()))); this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots); this.shardGenerations = Objects.requireNonNull(shardGenerations); + this.indexMetaDataGenerations = indexMetaDataGenerations; this.snapshotVersions = snapshotVersions; assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices " + shardGenerations.indices() + " but snapshots only reference indices " + indices.values(); @@ -116,7 +123,8 @@ public final class RepositoryData { } protected RepositoryData copy() { - return new RepositoryData(genId, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations); + return new RepositoryData( + genId, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations); } /** @@ -130,7 +138,8 @@ public final class RepositoryData { } final Map newVersions = new HashMap<>(snapshotVersions); versions.forEach((id, version) -> newVersions.put(id.getUUID(), version)); - return new RepositoryData(genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations); + return new RepositoryData( + genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations); } public ShardGenerations shardGenerations() { @@ -198,6 +207,32 @@ public final class RepositoryData { }).map(Map.Entry::getKey).collect(Collectors.toList()); } + /** + * Returns a map of {@link IndexId} to a collection of {@link String} containing all the {@link IndexId} and the + * {@link org.elasticsearch.cluster.metadata.IndexMetadata} blob name in it that can be removed after removing the given snapshot from + * the repository. + * NOTE: Does not return a mapping for {@link IndexId} values that will be removed completely from the repository. + * + * @param snapshotIds SnapshotIds to remove + * @return map of index to index metadata blob id to delete + */ + public Map> indexMetaDataToRemoveAfterRemovingSnapshots(Collection snapshotIds) { + Collection indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds); + final Set allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet().stream() + .filter(e -> snapshotIds.contains(e.getKey()) == false).flatMap(e -> e.getValue().values().stream()) + .map(indexMetaDataGenerations::getIndexMetaBlobId).collect(Collectors.toSet()); + final Map> toRemove = new HashMap<>(); + for (IndexId indexId : indicesForSnapshot) { + for (SnapshotId snapshotId : snapshotIds) { + final String identifier = indexMetaDataGenerations.indexMetaBlobId(snapshotId, indexId); + if (allRemainingIdentifiers.contains(identifier) == false) { + toRemove.computeIfAbsent(indexId, k -> new HashSet<>()).add(identifier); + } + } + } + return toRemove; + } + /** * Add a snapshot and its indices to the repository; returns a new instance. If the snapshot * already exists in the repository data, this method throws an IllegalArgumentException. @@ -206,11 +241,16 @@ public final class RepositoryData { * @param snapshotState State of the new snapshot * @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new * generations indexed by the shard id they correspond to must be supplied. + * @param indexMetaBlobs Map of index metadata blob uuids + * @param newIdentifiers Map of new index metadata blob uuids keyed by the identifiers of the + * {@link org.elasticsearch.cluster.metadata.IndexMetadata} in them */ public RepositoryData addSnapshot(final SnapshotId snapshotId, final SnapshotState snapshotState, final Version version, - final ShardGenerations shardGenerations) { + final ShardGenerations shardGenerations, + @Nullable final Map indexMetaBlobs, + @Nullable final Map newIdentifiers) { if (snapshotIds.containsKey(snapshotId.getUUID())) { // if the snapshot id already exists in the repository data, it means an old master // that is blocked from the cluster is trying to finalize a snapshot concurrently with @@ -235,8 +275,23 @@ public final class RepositoryData { allIndexSnapshots.put(indexId, Collections.unmodifiableList(copy)); } } + + final IndexMetaDataGenerations newIndexMetaGenerations; + if (indexMetaBlobs == null) { + assert newIdentifiers == null : "Non-null new identifiers [" + newIdentifiers + "] for null lookup"; + assert indexMetaDataGenerations.lookup.isEmpty() : + "Index meta generations should have been empty but was [" + indexMetaDataGenerations + "]"; + newIndexMetaGenerations = IndexMetaDataGenerations.EMPTY; + } else { + assert indexMetaBlobs.isEmpty() || shardGenerations.indices().equals(indexMetaBlobs.keySet()) : + "Shard generations contained indices " + shardGenerations.indices() + + " but indexMetaData was given for " + indexMetaBlobs.keySet(); + newIndexMetaGenerations = indexMetaDataGenerations.withAddedSnapshot(snapshotId, indexMetaBlobs, newIdentifiers); + } + return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots, - ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build()); + ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build(), + newIndexMetaGenerations); } /** @@ -249,7 +304,8 @@ public final class RepositoryData { if (newGeneration == genId) { return this; } - return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations); + return new RepositoryData( + newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations); } /** @@ -291,7 +347,8 @@ public final class RepositoryData { return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots, ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations) - .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build() + .retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build(), + indexMetaDataGenerations.withRemovedSnapshots(snapshots) ); } @@ -320,12 +377,14 @@ public final class RepositoryData { && snapshotVersions.equals(that.snapshotVersions) && indices.equals(that.indices) && indexSnapshots.equals(that.indexSnapshots) - && shardGenerations.equals(that.shardGenerations); + && shardGenerations.equals(that.shardGenerations) + && indexMetaDataGenerations.equals(that.indexMetaDataGenerations); } @Override public int hashCode() { - return Objects.hash(snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations); + return Objects.hash( + snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations, indexMetaDataGenerations); } /** @@ -366,6 +425,8 @@ public final class RepositoryData { } private static final String SHARD_GENERATIONS = "shard_generations"; + private static final String INDEX_METADATA_IDENTIFIERS = "index_metadata_identifiers"; + private static final String INDEX_METADATA_LOOKUP = "index_metadata_lookup"; private static final String SNAPSHOTS = "snapshots"; private static final String INDICES = "indices"; private static final String INDEX_ID = "id"; @@ -378,10 +439,12 @@ public final class RepositoryData { /** * Writes the snapshots metadata and the related indices metadata to x-content. */ - public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final boolean shouldWriteShardGens) throws IOException { + public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException { builder.startObject(); // write the snapshots list builder.startArray(SNAPSHOTS); + final boolean shouldWriteIndexGens = SnapshotsService.useIndexGenerations(repoMetaVersion); + final boolean shouldWriteShardGens = SnapshotsService.useShardGenerations(repoMetaVersion); for (final SnapshotId snapshot : getSnapshotIds()) { builder.startObject(); builder.field(NAME, snapshot.getName()); @@ -389,6 +452,10 @@ public final class RepositoryData { if (snapshotStates.containsKey(snapshot.getUUID())) { builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value()); } + if (shouldWriteIndexGens) { + builder.field(INDEX_METADATA_LOOKUP, indexMetaDataGenerations.lookup.getOrDefault(snapshot, Collections.emptyMap()) + .entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue))); + } if (snapshotVersions.containsKey(snapshot.getUUID())) { builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString()); } @@ -417,7 +484,10 @@ public final class RepositoryData { builder.endObject(); } builder.endObject(); - if (shouldWriteShardGens) { + if (shouldWriteIndexGens) { + builder.field(MIN_VERSION, SnapshotsService.INDEX_GEN_IN_REPO_DATA_VERSION.toString()); + builder.field(INDEX_METADATA_IDENTIFIERS, indexMetaDataGenerations.identifiers); + } else if (shouldWriteShardGens) { // Add min version field to make it impossible for older ES versions to deserialize this object builder.field(MIN_VERSION, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.toString()); } @@ -425,6 +495,10 @@ public final class RepositoryData { return builder; } + public IndexMetaDataGenerations indexMetaDataGenerations() { + return indexMetaDataGenerations; + } + /** * Reads an instance of {@link RepositoryData} from x-content, loading the snapshots and indices metadata. * @@ -438,6 +512,8 @@ public final class RepositoryData { final Map snapshotVersions = new HashMap<>(); final Map> indexSnapshots = new HashMap<>(); final ShardGenerations.Builder shardGenerations = ShardGenerations.builder(); + final Map indexMetaIdentifiers = new HashMap<>(); + final Map> indexMetaLookup = new HashMap<>(); if (parser.nextToken() == XContentParser.Token.START_OBJECT) { while (parser.nextToken() == XContentParser.Token.FIELD_NAME) { @@ -448,6 +524,7 @@ public final class RepositoryData { String name = null; String uuid = null; SnapshotState state = null; + Map metaGenerations = new HashMap<>(); Version version = null; while (parser.nextToken() != XContentParser.Token.END_OBJECT) { String currentFieldName = parser.currentName(); @@ -458,6 +535,8 @@ public final class RepositoryData { uuid = parser.text(); } else if (STATE.equals(currentFieldName)) { state = SnapshotState.fromValue(parser.numberValue().byteValue()); + } else if (INDEX_METADATA_LOOKUP.equals(currentFieldName)) { + metaGenerations.putAll(parser.mapStrings()); } else if (VERSION.equals(currentFieldName)) { version = Version.fromString(parser.text()); } @@ -470,6 +549,9 @@ public final class RepositoryData { snapshotVersions.put(uuid, version); } snapshots.put(snapshotId.getUUID(), snapshotId); + if (metaGenerations.isEmpty() == false) { + indexMetaLookup.put(snapshotId, metaGenerations); + } } } else { throw new ElasticsearchParseException("expected array for [" + field + "]"); @@ -545,6 +627,11 @@ public final class RepositoryData { } } } + } else if (INDEX_METADATA_IDENTIFIERS.equals(field)) { + if (parser.nextToken() != XContentParser.Token.START_OBJECT) { + throw new ElasticsearchParseException("start object expected [" + INDEX_METADATA_IDENTIFIERS + "]"); + } + indexMetaIdentifiers.putAll(parser.mapStrings()); } else if (MIN_VERSION.equals(field)) { if (parser.nextToken() != XContentParser.Token.VALUE_STRING) { throw new ElasticsearchParseException("version string expected [min_version]"); @@ -558,7 +645,12 @@ public final class RepositoryData { } else { throw new ElasticsearchParseException("start object expected"); } - return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build()); + final Map indexLookup = + indexSnapshots.keySet().stream().collect(Collectors.toMap(IndexId::getId, Function.identity())); + return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build(), + new IndexMetaDataGenerations(indexMetaLookup.entrySet().stream().collect( + Collectors.toMap(Map.Entry::getKey, e -> e.getValue().entrySet().stream() + .collect(Collectors.toMap(entry -> indexLookup.get(entry.getKey()), Map.Entry::getValue)))), indexMetaIdentifiers)); } } diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 89ccf6824d4..97ecfaf3a35 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -79,6 +79,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.AbstractRunnable; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.XContentBuilder; @@ -100,6 +101,7 @@ import org.elasticsearch.index.store.StoreFileMetadata; import org.elasticsearch.indices.recovery.RecoverySettings; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryCleanupResult; import org.elasticsearch.repositories.RepositoryData; @@ -579,7 +581,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp // delete an index that was created by another master node after writing this index-N blob. final Map foundIndices = blobStore().blobContainer(indicesPath()).children(); doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData, - SnapshotsService.useShardGenerations(repositoryMetaVersion), listener); + repositoryMetaVersion, listener); } @Override @@ -639,10 +641,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp * @param listener Listener to invoke once finished */ private void doDeleteShardSnapshots(Collection snapshotIds, long repositoryStateId, Map foundIndices, - Map rootBlobs, RepositoryData repositoryData, boolean writeShardGens, + Map rootBlobs, RepositoryData repositoryData, Version repoMetaVersion, ActionListener listener) { - if (writeShardGens) { + if (SnapshotsService.useShardGenerations(repoMetaVersion)) { // First write the new shard state metadata (with the removed snapshot) and compute deletion targets final StepListener> writeShardMetaDataAndComputeDeletesStep = new StepListener<>(); writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep); @@ -660,7 +662,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration); } final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build()); - writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(), + writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure)); }, listener::onFailure); // Once we have updated the repository, run the clean-ups @@ -669,12 +671,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final ActionListener afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener); - asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener); + asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), + afterCleanupsListener); }, listener::onFailure); } else { // Write the new repository data first (with the removed snapshot), using no shard generations final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY); - writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> { + writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(v -> { // Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion final ActionListener afterCleanupsListener = new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); @@ -682,7 +685,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final StepListener> writeMetaAndComputeDeletesStep = new StepListener<>(); writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep); writeMetaAndComputeDeletesStep.whenComplete(deleteResults -> - asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, deleteResults, afterCleanupsListener), + asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, deleteResults, afterCleanupsListener), afterCleanupsListener::onFailure); }, listener::onFailure)); } @@ -696,14 +699,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp l -> cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null)))); } - private void asyncCleanupUnlinkedShardLevelBlobs(Collection snapshotIds, + private void asyncCleanupUnlinkedShardLevelBlobs(RepositoryData oldRepositoryData, Collection snapshotIds, Collection deleteResults, ActionListener listener) { threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap( listener, l -> { try { - deleteFromContainer(blobContainer(), resolveFilesToDelete(snapshotIds, deleteResults)); + deleteFromContainer(blobContainer(), resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults)); l.onResponse(null); } catch (Exception e) { logger.warn( @@ -735,14 +738,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final Set survivingSnapshots = oldRepositoryData.getSnapshots(indexId).stream() .filter(id -> snapshotIds.contains(id) == false).collect(Collectors.toSet()); final StepListener> shardCountListener = new StepListener<>(); - final ActionListener allShardCountsListener = new GroupedActionListener<>(shardCountListener, snapshotIds.size()); - for (SnapshotId snapshotId : snapshotIds) { + final Collection indexMetaGenerations = snapshotIds.stream().map( + id -> oldRepositoryData.indexMetaDataGenerations().indexMetaBlobId(id, indexId)).collect(Collectors.toSet()); + final ActionListener allShardCountsListener = + new GroupedActionListener<>(shardCountListener, indexMetaGenerations.size()); + final BlobContainer indexContainer = indexContainer(indexId); + for (String indexMetaGeneration : indexMetaGenerations) { executor.execute(ActionRunnable.supply(allShardCountsListener, () -> { try { - return getSnapshotIndexMetadata(snapshotId, indexId).getNumberOfShards(); + return indexMetadataFormat.read(indexContainer, indexMetaGeneration).getNumberOfShards(); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage( - "[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex); + "[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex); // Just invoke the listener without any shard generations to count it down, this index will be cleaned up // by the stale data cleanup in the end. // TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just @@ -797,20 +804,22 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - private List resolveFilesToDelete(Collection snapshotIds, + private List resolveFilesToDelete(RepositoryData oldRepositoryData, Collection snapshotIds, Collection deleteResults) { final String basePath = basePath().buildAsString(); final int basePathLen = basePath.length(); + final Map> indexMetaGenerations = + oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(snapshotIds); return Stream.concat( - deleteResults.stream().flatMap(shardResult -> { - final String shardPath = - shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); - return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); - }), - deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().flatMap(indexId -> { - final String indexContainerPath = indexContainer(indexId).path().buildAsString(); - return snapshotIds.stream().map(snapshotId -> indexContainerPath + globalMetadataFormat.blobName(snapshotId.getUUID())); - }) + deleteResults.stream().flatMap(shardResult -> { + final String shardPath = + shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); + return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); + }), + indexMetaGenerations.entrySet().stream().flatMap(entry -> { + final String indexContainerPath = indexContainer(entry.getKey()).path().buildAsString(); + return entry.getValue().stream().map(id -> indexContainerPath + indexMetadataFormat.blobName(id)); + }) ).map(absolutePath -> { assert absolutePath.startsWith(basePath); return absolutePath.substring(basePathLen); @@ -854,6 +863,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp * Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the * repository. * TODO: Add shard level cleanups + * TODO: Add unreferenced index metadata cleanup *
    *
  • Deleting stale indices {@link #cleanupStaleIndices}
  • *
  • Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}
  • @@ -878,7 +888,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO)); } else { // write new index-N blob to ensure concurrent operations will fail - writeIndexGen(repositoryData, repositoryStateId, SnapshotsService.useShardGenerations(repositoryMetaVersion), + writeIndexGen(repositoryData, repositoryStateId, repositoryMetaVersion, Function.identity(), ActionListener.wrap(v -> cleanupStaleBlobs(Collections.emptyList(), foundIndices, rootBlobs, repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure)); } @@ -1002,49 +1012,82 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion); final Consumer onUpdateFailure = e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e)); - final ActionListener allMetaListener = new GroupedActionListener<>( - ActionListener.wrap(snapshotInfos -> { - assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; - final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next(); - getRepositoryData(ActionListener.wrap(existingRepositoryData -> { - final RepositoryData updatedRepositoryData = - existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations); - writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, stateTransformer, - ActionListener.wrap(writtenRepoData -> { - if (writeShardGens) { - cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); - } - listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo)); - }, onUpdateFailure)); - }, onUpdateFailure)); - }, onUpdateFailure), 2 + indices.size()); + final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); - // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will - // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the - // index or global metadata will be compatible with the segments written in this snapshot as well. - // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way - // that decrements the generation it points at + final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion); - // Write Global Metadata - executor.execute(ActionRunnable.run(allMetaListener, - () -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false))); + final StepListener repoDataListener = new StepListener<>(); + executor.execute(ActionRunnable.wrap(repoDataListener, this::getRepositoryData)); + repoDataListener.whenComplete(existingRepositoryData -> { - // write the index metadata for each index in the snapshot - for (IndexId index : indices) { - executor.execute(ActionRunnable.run(allMetaListener, () -> - indexMetadataFormat.write(clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false))); - } + final Map indexMetas; + final Map indexMetaIdentifiers; + if (writeIndexGens) { + indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap(); + indexMetas = ConcurrentCollections.newConcurrentMap(); + } else { + indexMetas = null; + indexMetaIdentifiers = null; + } - executor.execute(ActionRunnable.supply(allMetaListener, () -> { - final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, - indices.stream().map(IndexId::getName).collect(Collectors.toList()), + final ActionListener allMetaListener = new GroupedActionListener<>( + ActionListener.wrap(snapshotInfos -> { + assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos; + final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next(); + final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot( + snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations, indexMetas, indexMetaIdentifiers); + writeIndexGen(updatedRepositoryData, repositoryStateId, repositoryMetaVersion, stateTransformer, + ActionListener.wrap( + newRepoData -> { + if (writeShardGens) { + cleanupOldShardGens(existingRepositoryData, updatedRepositoryData); + } + listener.onResponse(new Tuple<>(newRepoData, snapshotInfo)); + }, onUpdateFailure)); + }, onUpdateFailure), 2 + indices.size()); + + // We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will + // mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the + // index or global metadata will be compatible with the segments written in this snapshot as well. + // Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way + // that decrements the generation it points at + + // Write Global MetaData + executor.execute(ActionRunnable.run(allMetaListener, + () -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false))); + + // write the index metadata for each index in the snapshot + for (IndexId index : indices) { + executor.execute(ActionRunnable.run(allMetaListener, () -> { + final IndexMetadata indexMetaData = clusterMetadata.index(index.getName()); + if (writeIndexGens) { + final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData); + String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers); + if (metaUUID == null) { + // We don't yet have this version of the metadata so we write it + metaUUID = UUIDs.base64UUID(); + indexMetadataFormat.write(indexMetaData, indexContainer(index), metaUUID, false); + indexMetaIdentifiers.put(identifiers, metaUUID); + } + indexMetas.put(index, identifiers); + } else { + indexMetadataFormat.write( + clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false); + } + } + )); + } + executor.execute(ActionRunnable.supply(allMetaListener, () -> { + final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId, + indices.stream().map(IndexId::getName).collect(Collectors.toList()), new ArrayList<>(clusterMetadata.dataStreams().keySet()), - startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, - includeGlobalState, userMetadata); - snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false); - return snapshotInfo; - })); + startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, + includeGlobalState, userMetadata); + snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false); + return snapshotInfo; + })); + }, onUpdateFailure); } // Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data @@ -1084,9 +1127,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } @Override - public IndexMetadata getSnapshotIndexMetadata(final SnapshotId snapshotId, final IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException { try { - return indexMetadataFormat.read(indexContainer(index), snapshotId.getUUID()); + return indexMetadataFormat.read(indexContainer(index), + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index)); } catch (NoSuchFileException e) { throw new SnapshotMissingException(metadata.name(), snapshotId, e); } @@ -1245,7 +1289,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp loaded = getRepositoryData(genToLoad); // We can cache in the most recent version here without regard to the actual repository metadata version since we're // only caching the information that we just wrote and thus won't accidentally cache any information that isn't safe - cacheRepositoryData(loaded, true); + cacheRepositoryData(loaded, Version.CURRENT); } listener.onResponse(loaded); return; @@ -1280,17 +1324,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp * modification can lead to moving from a higher {@code N} to a lower {@code N} value which mean we can't safely assume that a given * generation will always contain the same {@link RepositoryData}. * - * @param updated RepositoryData to cache if newer than the cache contents - * @param writeShardGens whether to cache shard generation values + * @param updated RepositoryData to cache if newer than the cache contents + * @param version version of the repository metadata that was cached */ - private void cacheRepositoryData(RepositoryData updated, boolean writeShardGens) { + private void cacheRepositoryData(RepositoryData updated, Version version) { if (cacheRepositoryData && bestEffortConsistency == false) { final BytesReference serialized; BytesStreamOutput out = new BytesStreamOutput(); try { try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out); XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) { - updated.snapshotsToXContent(builder, writeShardGens); + updated.snapshotsToXContent(builder, version); } serialized = out.bytes(); final int len = serialized.length(); @@ -1423,11 +1467,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp * * @param repositoryData RepositoryData to write * @param expectedGen expected repository generation at the start of the operation - * @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob + * @param version version of the repository metadata to write * @param stateFilter filter for the last cluster state update executed by this method * @param listener completion listener */ - protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens, + protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, Version version, Function stateFilter, ActionListener listener) { assert isReadOnly() == false; // can not write to a read only repository final long currentGen = repositoryData.getGenId(); @@ -1538,7 +1582,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen); logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob); writeAtomic(indexBlob, - BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true); + BytesReference.bytes( + filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version)), true); // write the current generation to the index-latest file final BytesReference genBytes; try (BytesStreamOutput bStream = new BytesStreamOutput()) { @@ -1579,7 +1624,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { final RepositoryData writtenRepositoryData = filteredRepositoryData.withGenId(newGen); - cacheRepositoryData(writtenRepositoryData, writeShardGens); + cacheRepositoryData(writtenRepositoryData, version); threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> { // Delete all now outdated index files up to 1000 blobs back from the new generation. // If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them. diff --git a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java index 9a55a515ad4..8b8e7c0e5d1 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/RestoreService.java @@ -246,7 +246,7 @@ public class RestoreService implements ClusterStateApplier { final List indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot); for (IndexId indexId : indexIdsInSnapshot) { - metadataBuilder.put(repository.getSnapshotIndexMetadata(snapshotId, indexId), false); + metadataBuilder.put(repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId), false); } final Metadata metadata = metadataBuilder.build(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 7959901b089..00029f8ef33 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -122,6 +122,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0; + public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = Version.V_7_9_0; + public static final Version OLD_SNAPSHOT_FORMAT = Version.V_7_5_0; public static final Version MULTI_DELETE_VERSION = Version.V_7_8_0; @@ -1505,7 +1507,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } /** - * Deletes snapshot from repository + * Checks whether the metadata version supports writing {@link ShardGenerations} to the repository. + * + * @param repositoryMetaVersion version to check + * @return true if version supports {@link ShardGenerations} + */ + public static boolean useIndexGenerations(Version repositoryMetaVersion) { + return repositoryMetaVersion.onOrAfter(INDEX_GEN_IN_REPO_DATA_VERSION); + } + + /** Deletes snapshot from repository * * @param repoName repository name * @param snapshotIds snapshot ids diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java index e6bab4428d7..38afb94f815 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoriesServiceTests.java @@ -49,7 +49,6 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -151,7 +150,7 @@ public class RepositoriesServiceTests extends ESTestCase { } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) { return null; } diff --git a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java index 33c67f77ebc..584956bbffc 100644 --- a/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/RepositoryDataTests.java @@ -34,6 +34,7 @@ import org.elasticsearch.test.ESTestCase; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -41,6 +42,8 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -74,7 +77,7 @@ public class RepositoryDataTests extends ESTestCase { public void testXContent() throws IOException { RepositoryData repositoryData = generateRandomRepoData(); XContentBuilder builder = JsonXContent.contentBuilder(); - repositoryData.snapshotsToXContent(builder, true); + repositoryData.snapshotsToXContent(builder, Version.CURRENT); try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) { long gen = (long) randomIntBetween(0, 500); RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen, randomBoolean()); @@ -106,9 +109,14 @@ public class RepositoryDataTests extends ESTestCase { indices.add(indexId); builder.put(indexId, 0, UUIDs.randomBase64UUID(random())); } + final ShardGenerations shardGenerations = builder.build(); + final Map indexLookup = + shardGenerations.indices().stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256))); RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot, randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), - randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build()); + randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), + shardGenerations, indexLookup, + indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random())))); // verify that the new repository data has the new snapshot and its indices assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot)); for (IndexId indexId : indices) { @@ -132,12 +140,12 @@ public class RepositoryDataTests extends ESTestCase { snapshotStates.put(snapshotId.getUUID(), randomFrom(SnapshotState.values())); snapshotVersions.put(snapshotId.getUUID(), randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion())); } - RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds, - Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY); + RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds, Collections.emptyMap(), Collections.emptyMap(), + Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); // test that initializing indices works Map> indices = randomIndices(snapshotIds); - RepositoryData newRepoData = - new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, snapshotVersions, indices, ShardGenerations.EMPTY); + RepositoryData newRepoData = new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, snapshotVersions, indices, + ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); List expected = new ArrayList<>(repositoryData.getSnapshotIds()); Collections.sort(expected); List actual = new ArrayList<>(newRepoData.getSnapshotIds()); @@ -153,7 +161,8 @@ public class RepositoryDataTests extends ESTestCase { List snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds()); assertThat(snapshotIds.size(), greaterThan(0)); SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1)); - RepositoryData newRepositoryData = repositoryData.removeSnapshots(Collections.singleton(removedSnapshotId), ShardGenerations.EMPTY); + RepositoryData newRepositoryData = + repositoryData.removeSnapshots(Collections.singleton(removedSnapshotId), ShardGenerations.EMPTY); // make sure the repository data's indices no longer contain the removed snapshot for (final IndexId indexId : newRepositoryData.getIndices().values()) { assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId)); @@ -173,8 +182,9 @@ public class RepositoryDataTests extends ESTestCase { public void testGetSnapshotState() { final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()); final SnapshotState state = randomFrom(SnapshotState.values()); - final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state, - randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), ShardGenerations.EMPTY); + final RepositoryData repositoryData = + RepositoryData.EMPTY.addSnapshot(snapshotId, state, randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), + ShardGenerations.EMPTY, Collections.emptyMap(), Collections.emptyMap()); assertEquals(state, repositoryData.getSnapshotState(snapshotId)); assertNull(repositoryData.getSnapshotState(new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()))); } @@ -184,7 +194,7 @@ public class RepositoryDataTests extends ESTestCase { final RepositoryData repositoryData = generateRandomRepoData(); XContentBuilder builder = XContentBuilder.builder(xContent); - repositoryData.snapshotsToXContent(builder, true); + repositoryData.snapshotsToXContent(builder, Version.CURRENT); RepositoryData parsedRepositoryData; try (XContentParser xParser = createParser(builder)) { parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId(), randomBoolean()); @@ -219,10 +229,10 @@ public class RepositoryDataTests extends ESTestCase { assertNotNull(corruptedIndexId); RepositoryData corruptedRepositoryData = new RepositoryData(parsedRepositoryData.getGenId(), snapshotIds, snapshotStates, - snapshotVersions, indexSnapshots, shardGenBuilder.build()); + snapshotVersions, indexSnapshots, shardGenBuilder.build(), IndexMetaDataGenerations.EMPTY); final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent); - corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, true); + corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, Version.CURRENT); try (XContentParser xParser = createParser(corruptedBuilder)) { ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () -> @@ -269,6 +279,57 @@ public class RepositoryDataTests extends ESTestCase { } } + // Test removing snapshot from random data where no two snapshots share any index metadata blobs + public void testIndexMetaDataToRemoveAfterRemovingSnapshotNoSharing() { + final RepositoryData repositoryData = generateRandomRepoData(); + final SnapshotId snapshotId = randomFrom(repositoryData.getSnapshotIds()); + final IndexMetaDataGenerations indexMetaDataGenerations = repositoryData.indexMetaDataGenerations(); + final Collection indicesToUpdate = repositoryData.indicesToUpdateAfterRemovingSnapshot(Collections.singleton(snapshotId)); + final Map> identifiersToRemove = indexMetaDataGenerations.lookup.get(snapshotId).entrySet().stream() + .filter(e -> indicesToUpdate.contains(e.getKey())) + .collect(Collectors.toMap(Map.Entry::getKey, + e -> Collections.singleton(indexMetaDataGenerations.getIndexMetaBlobId(e.getValue())))); + assertEquals(repositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(snapshotId)), identifiersToRemove); + } + + // Test removing snapshot from random data that has some or all index metadata shared + public void testIndexMetaDataToRemoveAfterRemovingSnapshotWithSharing() { + final RepositoryData repositoryData = generateRandomRepoData(); + final ShardGenerations.Builder builder = ShardGenerations.builder(); + final SnapshotId otherSnapshotId = randomFrom(repositoryData.getSnapshotIds()); + final Collection indicesInOther = repositoryData.getIndices().values() + .stream() + .filter(index -> repositoryData.getSnapshots(index).contains(otherSnapshotId)) + .collect(Collectors.toSet()); + for (IndexId indexId : indicesInOther) { + builder.put(indexId, 0, UUIDs.randomBase64UUID(random())); + } + final Map newIndices = new HashMap<>(); + final Map newIdentifiers = new HashMap<>(); + final Map> removeFromOther = new HashMap<>(); + for (IndexId indexId : randomSubsetOf(repositoryData.getIndices().values())) { + if (indicesInOther.contains(indexId)) { + removeFromOther.put(indexId, Collections.singleton( + repositoryData.indexMetaDataGenerations().indexMetaBlobId(otherSnapshotId, indexId))); + } + final String identifier = randomAlphaOfLength(20); + newIndices.put(indexId, identifier); + newIdentifiers.put(identifier, UUIDs.randomBase64UUID(random())); + builder.put(indexId, 0, UUIDs.randomBase64UUID(random())); + } + final ShardGenerations shardGenerations = builder.build(); + final Map indexLookup = new HashMap<>(repositoryData.indexMetaDataGenerations().lookup.get(otherSnapshotId)); + indexLookup.putAll(newIndices); + final SnapshotId newSnapshot = new SnapshotId(randomAlphaOfLength(7), UUIDs.randomBase64UUID(random())); + + RepositoryData newRepoData = + repositoryData.addSnapshot(newSnapshot, SnapshotState.SUCCESS, Version.CURRENT, shardGenerations, indexLookup, newIdentifiers); + assertEquals(newRepoData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(newSnapshot)), + newIndices.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, + e -> Collections.singleton(newIdentifiers.get(e.getValue()))))); + assertEquals(newRepoData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(otherSnapshotId)), removeFromOther); + } + public static RepositoryData generateRandomRepoData() { final int numIndices = randomIntBetween(1, 30); final List indices = new ArrayList<>(numIndices); @@ -288,8 +349,14 @@ public class RepositoryDataTests extends ESTestCase { builder.put(someIndex, j, uuid); } } - repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()), - randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build()); + final Map indexLookup = + someIndices.stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256))); + repositoryData = repositoryData.addSnapshot( + snapshotId, randomFrom(SnapshotState.values()), + randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), + builder.build(), + indexLookup, + indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random())))); } return repositoryData; } diff --git a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java index ba142f6cafe..c086bbb42e0 100644 --- a/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java +++ b/server/src/test/java/org/elasticsearch/repositories/blobstore/BlobStoreRepositoryTests.java @@ -200,7 +200,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { RepositoryData repositoryData = generateRandomRepoData(); final long startingGeneration = repositoryData.getGenId(); final PlainActionFuture future1 = PlainActionFuture.newFuture(); - repository.writeIndexGen(repositoryData, startingGeneration, true, Function.identity(),future1); + repository.writeIndexGen(repositoryData, startingGeneration, Version.CURRENT, Function.identity(),future1); // write repo data again to index generational file, errors because we already wrote to the // N+1 generation from which this repository data instance was created @@ -241,8 +241,8 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { } private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception { - PlainActionFuture.get(f -> repository.writeIndexGen(repositoryData, generation, true, - Function.identity(), f)); + PlainActionFuture.get( + f -> repository.writeIndexGen(repositoryData, generation, Version.CURRENT, Function.identity(), f)); } private BlobStoreRepository setupRepo() { @@ -273,8 +273,13 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase { for (int j = 0; j < numIndices; j++) { builder.put(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()), 0, "1"); } + final ShardGenerations shardGenerations = builder.build(); + final Map indexLookup = + shardGenerations.indices().stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256))); repoData = repoData.addSnapshot(snapshotId, - randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), Version.CURRENT, builder.build()); + randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), Version.CURRENT, shardGenerations, + indexLookup, + indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random())))); } return repoData; } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index e002fc197b2..3e81f15bfd8 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -778,7 +778,7 @@ public class SnapshotResiliencyTests extends ESTestCase { assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices))); if (snapshotInfo.successfulShards() == indices + 1) { final IndexMetadata indexMetadata = - repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index)); + repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index)); // Make sure we snapshotted the metadata of this index and not the recreated version assertEquals(indexMetadata.getIndex(), firstIndex.get()); } diff --git a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java index 9c9b144e94c..a3d046efb40 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java +++ b/test/framework/src/main/java/org/elasticsearch/index/shard/RestoreOnlyRepository.java @@ -33,6 +33,7 @@ import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus; import org.elasticsearch.index.store.Store; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; @@ -40,7 +41,6 @@ import org.elasticsearch.snapshots.SnapshotId; import org.elasticsearch.snapshots.SnapshotInfo; import org.elasticsearch.snapshots.SnapshotShardFailure; -import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -87,7 +87,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) { return null; } @@ -95,7 +95,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i public void getRepositoryData(ActionListener listener) { final IndexId indexId = new IndexId(indexName, "blah"); listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), - Collections.singletonMap(indexId, emptyList()), ShardGenerations.EMPTY)); + Collections.singletonMap(indexId, emptyList()), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY)); } @Override diff --git a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java index c3812602baf..a8b664b345b 100644 --- a/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java +++ b/test/framework/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreTestUtil.java @@ -61,6 +61,7 @@ import java.nio.file.NoSuchFileException; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -122,7 +123,7 @@ public final class BlobStoreTestUtil { LoggingDeprecationHandler.INSTANCE, blob)) { repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen, false); } - assertIndexUUIDs(blobContainer, repositoryData); + assertIndexUUIDs(repository, repositoryData); assertSnapshotUUIDs(repository, repositoryData); assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations()); return null; @@ -165,10 +166,10 @@ public final class BlobStoreTestUtil { } } - private static void assertIndexUUIDs(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException { + private static void assertIndexUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException { final List expectedIndexUUIDs = repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList()); - final BlobContainer indicesContainer = repoRoot.children().get("indices"); + final BlobContainer indicesContainer = repository.blobContainer().children().get("indices"); final List foundIndexUUIDs; if (indicesContainer == null) { foundIndexUUIDs = Collections.emptyList(); @@ -178,6 +179,21 @@ public final class BlobStoreTestUtil { s -> s.startsWith("extra") == false).collect(Collectors.toList()); } assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY))); + for (String indexId : foundIndexUUIDs) { + final Set indexMetaGenerationsFound = indicesContainer.children().get(indexId) + .listBlobsByPrefix(BlobStoreRepository.METADATA_PREFIX).keySet().stream() + .map(p -> p.replace(BlobStoreRepository.METADATA_PREFIX, "").replace(".dat", "")) + .collect(Collectors.toSet()); + final Set indexMetaGenerationsExpected = new HashSet<>(); + final IndexId idx = + repositoryData.getIndices().values().stream().filter(i -> i.getId().equals(indexId)).findFirst().get(); + for (SnapshotId snapshotId : repositoryData.getSnapshots(idx)) { + indexMetaGenerationsExpected.add(repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, idx)); + } + // TODO: assertEquals(indexMetaGenerationsExpected, indexMetaGenerationsFound); requires cleanup functionality for + // index meta generations blobs + assertTrue(indexMetaGenerationsFound.containsAll(indexMetaGenerationsExpected)); + } } private static void assertSnapshotUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException { @@ -208,8 +224,9 @@ public final class BlobStoreTestUtil { assertThat(indices, hasKey(indexId.getId())); final BlobContainer indexContainer = indices.get(indexId.getId()); assertThat(indexContainer.listBlobs(), - hasKey(String.format(Locale.ROOT, BlobStoreRepository.METADATA_NAME_FORMAT, snapshotId.getUUID()))); - final IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotId, indexId); + hasKey(String.format(Locale.ROOT, BlobStoreRepository.METADATA_NAME_FORMAT, + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, indexId)))); + final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId); for (Map.Entry entry : indexContainer.children().entrySet()) { // Skip Lucene MockFS extraN directory if (entry.getKey().startsWith("extra")) { diff --git a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java index d36c69d0c6b..3268318f5f7 100644 --- a/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/snapshots/AbstractSnapshotIntegTestCase.java @@ -53,7 +53,6 @@ import org.elasticsearch.snapshots.mockstore.MockRepository; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.test.VersionUtils; -import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; import java.io.IOException; @@ -325,8 +324,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { logger.info("--> writing downgraded RepositoryData for repository metadata version [{}]", version); final RepositoryData repositoryData = getRepositoryData(repoName); final XContentBuilder jsonBuilder = JsonXContent.contentBuilder(); - final boolean writeShardGens = version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION); - repositoryData.snapshotsToXContent(jsonBuilder, writeShardGens); + repositoryData.snapshotsToXContent(jsonBuilder, version); final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser( NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, @@ -334,7 +332,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase { repositoryData.getGenId(), randomBoolean()); Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()), BytesReference.toBytes(BytesReference.bytes( - downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens))), + downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), version))), StandardOpenOption.TRUNCATE_EXISTING); return oldVersionSnapshot; } diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java index 755683bf651..f21a432ce07 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/repository/CcrRepository.java @@ -61,6 +61,7 @@ import org.elasticsearch.indices.recovery.MultiChunkTransfer; import org.elasticsearch.indices.recovery.MultiFileWriter; import org.elasticsearch.indices.recovery.RecoveryState; import org.elasticsearch.repositories.IndexId; +import org.elasticsearch.repositories.IndexMetaDataGenerations; import org.elasticsearch.repositories.Repository; import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.ShardGenerations; @@ -193,7 +194,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit } @Override - public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException { + public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) { assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId"; String leaderIndex = index.getName(); Client remoteClient = getRemoteClusterClient(); @@ -256,7 +257,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit Index index = remoteIndices.get(indexName).getIndex(); indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singletonList(snapshotId)); } - return new RepositoryData(1, copiedSnapshotIds, snapshotStates, snapshotVersions, indexSnapshots, ShardGenerations.EMPTY); + return new RepositoryData(1, copiedSnapshotIds, snapshotStates, snapshotVersions, indexSnapshots, + ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY); }); } diff --git a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java index 91140d57f2e..43f7f97cd57 100644 --- a/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java +++ b/x-pack/plugin/core/src/test/java/org/elasticsearch/snapshots/SourceOnlySnapshotShardTests.java @@ -237,7 +237,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase { ShardRoutingState.INITIALIZING, new RecoverySource.SnapshotRecoverySource( UUIDs.randomBase64UUID(), new Snapshot("src_only", snapshotId), Version.CURRENT, indexId)); - IndexMetadata metadata = runAsSnapshot(threadPool, () -> repository.getSnapshotIndexMetadata(snapshotId, indexId)); + IndexMetadata metadata = runAsSnapshot(threadPool, () -> + repository.getSnapshotIndexMetaData(PlainActionFuture.get(repository::getRepositoryData), snapshotId, indexId)); IndexShard restoredShard = newShard( shardRouting, metadata, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {}, RetentionLeaseSyncer.EMPTY); DiscoveryNode discoveryNode = new DiscoveryNode("node_g", buildNewFakeTransportAddress(), Version.CURRENT);