This PR introduces two new fields in to `RepositoryData` (index-N) to track the blob name of `IndexMetaData` blobs and their content via setting generations and uuids. This is used to deduplicate the `IndexMetaData` blobs (`meta-{uuid}.dat` in the indices folders under `/indices` so that new metadata for an index is only written to the repository during a snapshot if that same metadata can't be found in another snapshot. This saves one write per index in the common case of unchanged metadata thus saving cost and making snapshot finalization drastically faster if many indices are being snapshotted at the same time. The implementation is mostly analogous to that for shard generations in #46250 and piggy backs on the BwC mechanism introduced in that PR (which means this PR needs adjustments if it doesn't go into `7.6`). Relates to #45736 as it improves the efficiency of snapshotting unchanged indices Relates to #49800 as it has the potential of loading the index metadata for multiple snapshots of the same index concurrently much more efficient speeding up future concurrent snapshot delete
This commit is contained in:
parent
0d2ea1b881
commit
d456f7870a
|
@ -160,8 +160,8 @@ public class S3BlobStoreRepositoryTests extends ESMockAPIBasedRepositoryIntegTes
|
||||||
final RepositoryData repositoryData = getRepositoryData(repository);
|
final RepositoryData repositoryData = getRepositoryData(repository);
|
||||||
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
|
final RepositoryData modifiedRepositoryData = repositoryData.withVersions(Collections.singletonMap(fakeOldSnapshot,
|
||||||
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
|
SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.minimumCompatibilityVersion()));
|
||||||
final BytesReference serialized =
|
final BytesReference serialized = BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(),
|
||||||
BytesReference.bytes(modifiedRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), false));
|
SnapshotsService.OLD_SNAPSHOT_FORMAT));
|
||||||
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
|
PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.run(f, () -> {
|
||||||
try (InputStream stream = serialized.streamInput()) {
|
try (InputStream stream = serialized.streamInput()) {
|
||||||
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
|
repository.blobStore().blobContainer(repository.basePath()).writeBlobAtomic(
|
||||||
|
|
|
@ -218,7 +218,7 @@ public class MultiVersionRepositoryAccessIT extends ESRestTestCase {
|
||||||
ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards);
|
ensureSnapshotRestoreWorks(repoName, "snapshot-2", shards);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (SnapshotsService.useShardGenerations(minimumNodeVersion()) == false) {
|
if (SnapshotsService.useIndexGenerations(minimumNodeVersion()) == false) {
|
||||||
assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER));
|
assertThat(TEST_STEP, is(TestStep.STEP3_OLD_CLUSTER));
|
||||||
final List<Class<? extends Exception>> expectedExceptions =
|
final List<Class<? extends Exception>> expectedExceptions =
|
||||||
Arrays.asList(ResponseException.class, ElasticsearchStatusException.class);
|
Arrays.asList(ResponseException.class, ElasticsearchStatusException.class);
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.unit.ByteSizeUnit;
|
import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.xcontent.XContentFactory;
|
import org.elasticsearch.common.xcontent.XContentFactory;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
|
import org.elasticsearch.repositories.IndexMetaDataGenerations;
|
||||||
import org.elasticsearch.repositories.RepositoriesService;
|
import org.elasticsearch.repositories.RepositoriesService;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
import org.elasticsearch.repositories.RepositoryData;
|
import org.elasticsearch.repositories.RepositoryData;
|
||||||
|
@ -273,11 +274,12 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
||||||
SnapshotId::getUUID, Function.identity())),
|
SnapshotId::getUUID, Function.identity())),
|
||||||
repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(
|
repositoryData.getSnapshotIds().stream().collect(Collectors.toMap(
|
||||||
SnapshotId::getUUID, repositoryData::getSnapshotState)),
|
SnapshotId::getUUID, repositoryData::getSnapshotState)),
|
||||||
Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
|
Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY);
|
||||||
|
|
||||||
Files.write(repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()),
|
Files.write(repo.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + withoutVersions.getGenId()),
|
||||||
BytesReference.toBytes(BytesReference.bytes(withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(),
|
BytesReference.toBytes(BytesReference.bytes(
|
||||||
true))), StandardOpenOption.TRUNCATE_EXISTING);
|
withoutVersions.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))),
|
||||||
|
StandardOpenOption.TRUNCATE_EXISTING);
|
||||||
|
|
||||||
logger.info("--> verify that repo is assumed in old metadata format");
|
logger.info("--> verify that repo is assumed in old metadata format");
|
||||||
final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class);
|
final SnapshotsService snapshotsService = internalCluster().getCurrentMasterNodeInstance(SnapshotsService.class);
|
||||||
|
@ -403,11 +405,12 @@ public class CorruptedBlobStoreRepositoryIT extends AbstractSnapshotIntegTestCas
|
||||||
Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)),
|
Collectors.toMap(SnapshotId::getUUID, repositoryData1::getVersion)),
|
||||||
repositoryData1.getIndices().values().stream().collect(
|
repositoryData1.getIndices().values().stream().collect(
|
||||||
Collectors.toMap(Function.identity(), repositoryData1::getSnapshots)
|
Collectors.toMap(Function.identity(), repositoryData1::getSnapshots)
|
||||||
), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build()
|
), ShardGenerations.builder().putAll(repositoryData1.shardGenerations()).put(indexId, 0, "0").build(),
|
||||||
|
repositoryData1.indexMetaDataGenerations()
|
||||||
);
|
);
|
||||||
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()),
|
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData1.getGenId()),
|
||||||
BytesReference.toBytes(BytesReference.bytes(
|
BytesReference.toBytes(BytesReference.bytes(
|
||||||
brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), true))),
|
brokenRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), Version.CURRENT))),
|
||||||
StandardOpenOption.TRUNCATE_EXISTING);
|
StandardOpenOption.TRUNCATE_EXISTING);
|
||||||
|
|
||||||
logger.info("--> recreating repository to clear caches");
|
logger.info("--> recreating repository to clear caches");
|
||||||
|
|
|
@ -73,9 +73,11 @@ import org.elasticsearch.indices.recovery.RecoveryState;
|
||||||
import org.elasticsearch.node.Node;
|
import org.elasticsearch.node.Node;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.repositories.RepositoryMissingException;
|
import org.elasticsearch.repositories.RepositoryMissingException;
|
||||||
|
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
|
||||||
import org.elasticsearch.rest.AbstractRestChannel;
|
import org.elasticsearch.rest.AbstractRestChannel;
|
||||||
import org.elasticsearch.rest.RestRequest;
|
import org.elasticsearch.rest.RestRequest;
|
||||||
import org.elasticsearch.rest.RestResponse;
|
import org.elasticsearch.rest.RestResponse;
|
||||||
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction;
|
import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction;
|
||||||
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
|
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
|
||||||
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||||
|
@ -996,6 +998,8 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
||||||
|
|
||||||
SnapshotStats stats = snapshots.get(0).getStats();
|
SnapshotStats stats = snapshots.get(0).getStats();
|
||||||
|
|
||||||
|
final List<Path> snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath);
|
||||||
|
assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index
|
||||||
assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount));
|
assertThat(stats.getTotalFileCount(), greaterThanOrEqualTo(snapshot0FileCount));
|
||||||
assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize));
|
assertThat(stats.getTotalSize(), greaterThanOrEqualTo(snapshot0FileSize));
|
||||||
|
|
||||||
|
@ -1023,6 +1027,10 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
||||||
.get();
|
.get();
|
||||||
|
|
||||||
final List<Path> snapshot1Files = scanSnapshotFolder(repoPath);
|
final List<Path> snapshot1Files = scanSnapshotFolder(repoPath);
|
||||||
|
final List<Path> snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath);
|
||||||
|
|
||||||
|
// The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository
|
||||||
|
assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles));
|
||||||
|
|
||||||
final int snapshot1FileCount = snapshot1Files.size();
|
final int snapshot1FileCount = snapshot1Files.size();
|
||||||
final long snapshot1FileSize = calculateTotalFilesSize(snapshot1Files);
|
final long snapshot1FileSize = calculateTotalFilesSize(snapshot1Files);
|
||||||
|
@ -1047,6 +1055,65 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
||||||
assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize));
|
assertThat(anotherStats.getTotalSize(), greaterThanOrEqualTo(snapshot1FileSize));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDeduplicateIndexMetadata() throws Exception {
|
||||||
|
final String indexName = "test-blocks-1";
|
||||||
|
final String repositoryName = "repo-" + indexName;
|
||||||
|
final String snapshot0 = "snapshot-0";
|
||||||
|
final String snapshot1 = "snapshot-1";
|
||||||
|
final String snapshot2 = "snapshot-2";
|
||||||
|
|
||||||
|
createIndex(indexName);
|
||||||
|
|
||||||
|
int docs = between(10, 100);
|
||||||
|
for (int i = 0; i < docs; i++) {
|
||||||
|
client().prepareIndex(indexName, "_doc").setSource("test", "init").execute().actionGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
final Path repoPath = randomRepoPath();
|
||||||
|
createRepository(repositoryName, "fs", repoPath);
|
||||||
|
|
||||||
|
logger.info("--> create a snapshot");
|
||||||
|
client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot0)
|
||||||
|
.setIncludeGlobalState(true)
|
||||||
|
.setWaitForCompletion(true)
|
||||||
|
.get();
|
||||||
|
|
||||||
|
final List<Path> snapshot0IndexMetaFiles = findRepoMetaBlobs(repoPath);
|
||||||
|
assertThat(snapshot0IndexMetaFiles, hasSize(1)); // snapshotting a single index
|
||||||
|
|
||||||
|
docs = between(1, 5);
|
||||||
|
for (int i = 0; i < docs; i++) {
|
||||||
|
client().prepareIndex(indexName, "_doc").setSource("test", "test" + i).execute().actionGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("--> restart random data node and add new data node to change index allocation");
|
||||||
|
internalCluster().restartRandomDataNode();
|
||||||
|
internalCluster().startDataOnlyNode();
|
||||||
|
ensureGreen(indexName);
|
||||||
|
|
||||||
|
assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot1).setWaitForCompletion(true).get().status(),
|
||||||
|
equalTo(RestStatus.OK));
|
||||||
|
|
||||||
|
final List<Path> snapshot1IndexMetaFiles = findRepoMetaBlobs(repoPath);
|
||||||
|
|
||||||
|
// The IndexMetadata did not change between snapshots, verify that no new redundant IndexMetaData was written to the repository
|
||||||
|
assertThat(snapshot1IndexMetaFiles, is(snapshot0IndexMetaFiles));
|
||||||
|
|
||||||
|
// index to some other field to trigger a change in index metadata
|
||||||
|
for (int i = 0; i < docs; i++) {
|
||||||
|
client().prepareIndex(indexName, "_doc").setSource("new_field", "test" + i).execute().actionGet();
|
||||||
|
}
|
||||||
|
assertThat(client().admin().cluster().prepareCreateSnapshot(repositoryName, snapshot2).setWaitForCompletion(true).get().status(),
|
||||||
|
equalTo(RestStatus.OK));
|
||||||
|
|
||||||
|
final List<Path> snapshot2IndexMetaFiles = findRepoMetaBlobs(repoPath);
|
||||||
|
assertThat(snapshot2IndexMetaFiles, hasSize(2)); // should have created one new metadata blob
|
||||||
|
|
||||||
|
assertAcked(client().admin().cluster().prepareDeleteSnapshot(repositoryName, snapshot0, snapshot1).get());
|
||||||
|
final List<Path> snapshot3IndexMetaFiles = findRepoMetaBlobs(repoPath);
|
||||||
|
assertThat(snapshot3IndexMetaFiles, hasSize(1)); // should have deleted the metadata blob referenced by the first two snapshots
|
||||||
|
}
|
||||||
|
|
||||||
public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
|
public void testDataNodeRestartWithBusyMasterDuringSnapshot() throws Exception {
|
||||||
logger.info("--> starting a master node and two data nodes");
|
logger.info("--> starting a master node and two data nodes");
|
||||||
internalCluster().startMasterOnlyNode();
|
internalCluster().startMasterOnlyNode();
|
||||||
|
@ -1256,6 +1323,22 @@ public class DedicatedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTest
|
||||||
}).sum();
|
}).sum();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static List<Path> findRepoMetaBlobs(Path repoPath) throws IOException {
|
||||||
|
List<Path> files = new ArrayList<>();
|
||||||
|
Files.walkFileTree(repoPath.resolve("indices"), new SimpleFileVisitor<Path>() {
|
||||||
|
@Override
|
||||||
|
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
|
||||||
|
final String fileName = file.getFileName().toString();
|
||||||
|
if (fileName.startsWith(BlobStoreRepository.METADATA_PREFIX) && fileName.endsWith(".dat")) {
|
||||||
|
files.add(file);
|
||||||
|
}
|
||||||
|
return super.visitFile(file, attrs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
return files;
|
||||||
|
}
|
||||||
|
|
||||||
private List<Path> scanSnapshotFolder(Path repoPath) throws IOException {
|
private List<Path> scanSnapshotFolder(Path repoPath) throws IOException {
|
||||||
List<Path> files = new ArrayList<>();
|
List<Path> files = new ArrayList<>();
|
||||||
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>(){
|
Files.walkFileTree(repoPath, new SimpleFileVisitor<Path>(){
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRes
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
|
import org.elasticsearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
|
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse;
|
||||||
|
import org.elasticsearch.action.support.PlainActionFuture;
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||||
import org.elasticsearch.cluster.metadata.Metadata;
|
import org.elasticsearch.cluster.metadata.Metadata;
|
||||||
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
|
||||||
|
@ -34,6 +35,7 @@ import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
import org.elasticsearch.repositories.RepositoriesService;
|
import org.elasticsearch.repositories.RepositoriesService;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
|
import org.elasticsearch.repositories.RepositoryData;
|
||||||
import org.elasticsearch.rest.RestStatus;
|
import org.elasticsearch.rest.RestStatus;
|
||||||
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||||
|
|
||||||
|
@ -198,9 +200,10 @@ public class MetadataLoadingDuringSnapshotRestoreIT extends AbstractSnapshotInte
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId indexId) throws IOException {
|
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId,
|
||||||
|
IndexId indexId) throws IOException {
|
||||||
indicesMetadata.computeIfAbsent(key(snapshotId.getName(), indexId.getName()), (s) -> new AtomicInteger(0)).incrementAndGet();
|
indicesMetadata.computeIfAbsent(key(snapshotId.getName(), indexId.getName()), (s) -> new AtomicInteger(0)).incrementAndGet();
|
||||||
return super.getSnapshotIndexMetadata(snapshotId, indexId);
|
return super.getSnapshotIndexMetaData(PlainActionFuture.get(this::getRepositoryData), snapshotId, indexId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2546,7 +2546,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
||||||
final IndexId corruptedIndex = randomFrom(indexIds.values());
|
final IndexId corruptedIndex = randomFrom(indexIds.values());
|
||||||
final Path indexMetadataPath = repo.resolve("indices")
|
final Path indexMetadataPath = repo.resolve("indices")
|
||||||
.resolve(corruptedIndex.getId())
|
.resolve(corruptedIndex.getId())
|
||||||
.resolve("meta-" + snapshotInfo.snapshotId().getUUID() + ".dat");
|
.resolve(
|
||||||
|
"meta-" + repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotInfo.snapshotId(), corruptedIndex) + ".dat");
|
||||||
|
|
||||||
// Truncate the index metadata file
|
// Truncate the index metadata file
|
||||||
try(SeekableByteChannel outChan = Files.newByteChannel(indexMetadataPath, StandardOpenOption.WRITE)) {
|
try(SeekableByteChannel outChan = Files.newByteChannel(indexMetadataPath, StandardOpenOption.WRITE)) {
|
||||||
|
|
|
@ -334,7 +334,7 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
|
||||||
final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
|
final Map<ShardId, IndexShardSnapshotStatus> shardStatus = new HashMap<>();
|
||||||
for (String index : snapshotInfo.indices()) {
|
for (String index : snapshotInfo.indices()) {
|
||||||
IndexId indexId = repositoryData.resolveIndexId(index);
|
IndexId indexId = repositoryData.resolveIndexId(index);
|
||||||
IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), indexId);
|
IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), indexId);
|
||||||
if (indexMetadata != null) {
|
if (indexMetadata != null) {
|
||||||
int numberOfShards = indexMetadata.getNumberOfShards();
|
int numberOfShards = indexMetadata.getNumberOfShards();
|
||||||
for (int i = 0; i < numberOfShards; i++) {
|
for (int i = 0; i < numberOfShards; i++) {
|
||||||
|
|
|
@ -70,8 +70,8 @@ public class FilterRepository implements Repository {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException {
|
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
|
||||||
return in.getSnapshotIndexMetadata(snapshotId, index);
|
return in.getSnapshotIndexMetaData(repositoryData, snapshotId, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -0,0 +1,177 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.repositories;
|
||||||
|
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||||
|
import org.elasticsearch.common.Nullable;
|
||||||
|
import org.elasticsearch.snapshots.SnapshotId;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tracks the blob uuids of blobs containing {@link IndexMetadata} for snapshots as well an identifier for each of these blobs.
|
||||||
|
* Before writing a new {@link IndexMetadata} blob during snapshot finalization in
|
||||||
|
* {@link org.elasticsearch.repositories.blobstore.BlobStoreRepository#finalizeSnapshot} the identifier for an instance of
|
||||||
|
* {@link IndexMetadata} should be computed and then used to check if it already exists in the repository via
|
||||||
|
* {@link #getIndexMetaBlobId(String)}.
|
||||||
|
*/
|
||||||
|
public final class IndexMetaDataGenerations {
|
||||||
|
|
||||||
|
public static final IndexMetaDataGenerations EMPTY = new IndexMetaDataGenerations(Collections.emptyMap(), Collections.emptyMap());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map of {@link SnapshotId} to a map of the indices in a snapshot mapping {@link IndexId} to metadata identifiers.
|
||||||
|
* The identifiers in the nested map can be mapped to the relevant blob uuid via {@link #getIndexMetaBlobId}.
|
||||||
|
*/
|
||||||
|
final Map<SnapshotId, Map<IndexId, String>> lookup;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map of index metadata identifier to blob uuid.
|
||||||
|
*/
|
||||||
|
final Map<String, String> identifiers;
|
||||||
|
|
||||||
|
IndexMetaDataGenerations(Map<SnapshotId, Map<IndexId, String>> lookup, Map<String, String> identifiers) {
|
||||||
|
assert identifiers.keySet().equals(lookup.values().stream().flatMap(m -> m.values().stream()).collect(Collectors.toSet())) :
|
||||||
|
"identifier mappings " + identifiers + " don't track the same blob ids as the lookup map " + lookup;
|
||||||
|
assert lookup.values().stream().noneMatch(Map::isEmpty) : "Lookup contained empty map [" + lookup + "]";
|
||||||
|
this.lookup = Collections.unmodifiableMap(lookup);
|
||||||
|
this.identifiers = Collections.unmodifiableMap(identifiers);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return identifiers.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Gets the blob id by the identifier of {@link org.elasticsearch.cluster.metadata.IndexMetadata}
|
||||||
|
* (computed via {@link #buildUniqueIdentifier}) or {@code null} if none is tracked for the identifier.
|
||||||
|
*
|
||||||
|
* @param metaIdentifier identifier for {@link IndexMetadata}
|
||||||
|
* @return blob id for the given metadata identifier or {@code null} if the identifier is not part of the repository yet
|
||||||
|
*/
|
||||||
|
@Nullable
|
||||||
|
public String getIndexMetaBlobId(String metaIdentifier) {
|
||||||
|
return identifiers.get(metaIdentifier);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the blob id by {@link SnapshotId} and {@link IndexId} and fall back to the value of {@link SnapshotId#getUUID()} if none is
|
||||||
|
* known to enable backwards compatibility with versions older than
|
||||||
|
* {@link org.elasticsearch.snapshots.SnapshotsService#SHARD_GEN_IN_REPO_DATA_VERSION} which used the snapshot uuid as index metadata
|
||||||
|
* blob uuid.
|
||||||
|
*
|
||||||
|
* @param snapshotId Snapshot Id
|
||||||
|
* @param indexId Index Id
|
||||||
|
* @return blob id for the given index metadata
|
||||||
|
*/
|
||||||
|
public String indexMetaBlobId(SnapshotId snapshotId, IndexId indexId) {
|
||||||
|
final String identifier = lookup.getOrDefault(snapshotId, Collections.emptyMap()).get(indexId);
|
||||||
|
if (identifier == null) {
|
||||||
|
return snapshotId.getUUID();
|
||||||
|
} else {
|
||||||
|
return identifiers.get(identifier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new instance with the given snapshot and index metadata uuids and identifiers added.
|
||||||
|
*
|
||||||
|
* @param snapshotId SnapshotId
|
||||||
|
* @param newLookup new mappings of index + snapshot to index metadata identifier
|
||||||
|
* @param newIdentifiers new mappings of index metadata identifier to blob id
|
||||||
|
* @return instance with added snapshot
|
||||||
|
*/
|
||||||
|
public IndexMetaDataGenerations withAddedSnapshot(SnapshotId snapshotId, Map<IndexId, String> newLookup,
|
||||||
|
Map<String, String> newIdentifiers) {
|
||||||
|
final Map<SnapshotId, Map<IndexId, String>> updatedIndexMetaLookup = new HashMap<>(this.lookup);
|
||||||
|
final Map<String, String> updatedIndexMetaIdentifiers = new HashMap<>(identifiers);
|
||||||
|
updatedIndexMetaIdentifiers.putAll(newIdentifiers);
|
||||||
|
updatedIndexMetaLookup.compute(snapshotId, (snId, lookup) -> {
|
||||||
|
if (lookup == null) {
|
||||||
|
if (newLookup.isEmpty()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return Collections.unmodifiableMap(new HashMap<>(newLookup));
|
||||||
|
} else {
|
||||||
|
final Map<IndexId, String> updated = new HashMap<>(lookup);
|
||||||
|
updated.putAll(newLookup);
|
||||||
|
return Collections.unmodifiableMap(updated);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return new IndexMetaDataGenerations(updatedIndexMetaLookup, updatedIndexMetaIdentifiers);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create a new instance with the given snapshot removed.
|
||||||
|
*
|
||||||
|
* @param snapshotIds SnapshotIds to remove
|
||||||
|
* @return new instance without the given snapshot
|
||||||
|
*/
|
||||||
|
public IndexMetaDataGenerations withRemovedSnapshots(Collection<SnapshotId> snapshotIds) {
|
||||||
|
final Map<SnapshotId, Map<IndexId, String>> updatedIndexMetaLookup = new HashMap<>(lookup);
|
||||||
|
updatedIndexMetaLookup.keySet().removeAll(snapshotIds);
|
||||||
|
final Map<String, String> updatedIndexMetaIdentifiers = new HashMap<>(identifiers);
|
||||||
|
updatedIndexMetaIdentifiers.keySet().removeIf(
|
||||||
|
k -> updatedIndexMetaLookup.values().stream().noneMatch(identifiers -> identifiers.containsValue(k)));
|
||||||
|
return new IndexMetaDataGenerations(updatedIndexMetaLookup, updatedIndexMetaIdentifiers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return Objects.hash(identifiers, lookup);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object that) {
|
||||||
|
if (this == that) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (that instanceof IndexMetaDataGenerations == false) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
final IndexMetaDataGenerations other = (IndexMetaDataGenerations) that;
|
||||||
|
return lookup.equals(other.lookup) && identifiers.equals(other.identifiers);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "IndexMetaDataGenerations{lookup:" + lookup + "}{identifier:" + identifiers + "}";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Compute identifier for {@link IndexMetadata} from its index- and history-uuid as well as its settings-, mapping- and alias-version.
|
||||||
|
* If an index did not see a change in its settings, mappings or aliases between two points in time then the identifier will not change
|
||||||
|
* between them either.
|
||||||
|
*
|
||||||
|
* @param indexMetaData IndexMetaData
|
||||||
|
* @return identifier string
|
||||||
|
*/
|
||||||
|
public static String buildUniqueIdentifier(IndexMetadata indexMetaData) {
|
||||||
|
return indexMetaData.getIndexUUID() +
|
||||||
|
"-" + indexMetaData.getSettings().get(IndexMetadata.SETTING_HISTORY_UUID, IndexMetadata.INDEX_UUID_NA_VALUE) +
|
||||||
|
"-" + indexMetaData.getSettingsVersion() + "-" + indexMetaData.getMappingVersion() +
|
||||||
|
"-" + indexMetaData.getAliasesVersion();
|
||||||
|
}
|
||||||
|
}
|
|
@ -102,11 +102,12 @@ public interface Repository extends LifecycleComponent {
|
||||||
/**
|
/**
|
||||||
* Returns the index metadata associated with the snapshot.
|
* Returns the index metadata associated with the snapshot.
|
||||||
*
|
*
|
||||||
|
* @param repositoryData current {@link RepositoryData}
|
||||||
* @param snapshotId the snapshot id to load the index metadata from
|
* @param snapshotId the snapshot id to load the index metadata from
|
||||||
* @param index the {@link IndexId} to load the metadata from
|
* @param index the {@link IndexId} to load the metadata from
|
||||||
* @return the index metadata about the given index for the given snapshot
|
* @return the index metadata about the given index for the given snapshot
|
||||||
*/
|
*/
|
||||||
IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException;
|
IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
|
* Returns a {@link RepositoryData} to describe the data in the repository, including the snapshots
|
||||||
|
|
|
@ -40,6 +40,7 @@ import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -67,8 +68,8 @@ public final class RepositoryData {
|
||||||
/**
|
/**
|
||||||
* An instance initialized for an empty repository.
|
* An instance initialized for an empty repository.
|
||||||
*/
|
*/
|
||||||
public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN,
|
public static final RepositoryData EMPTY = new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(),
|
||||||
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
|
Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The generational id of the index file from which the repository data was read.
|
* The generational id of the index file from which the repository data was read.
|
||||||
|
@ -93,6 +94,11 @@ public final class RepositoryData {
|
||||||
|
|
||||||
private final Map<String, Version> snapshotVersions;
|
private final Map<String, Version> snapshotVersions;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Index metadata generations.
|
||||||
|
*/
|
||||||
|
private final IndexMetaDataGenerations indexMetaDataGenerations;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Shard generations.
|
* Shard generations.
|
||||||
*/
|
*/
|
||||||
|
@ -100,7 +106,7 @@ public final class RepositoryData {
|
||||||
|
|
||||||
public RepositoryData(long genId, Map<String, SnapshotId> snapshotIds, Map<String, SnapshotState> snapshotStates,
|
public RepositoryData(long genId, Map<String, SnapshotId> snapshotIds, Map<String, SnapshotState> snapshotStates,
|
||||||
Map<String, Version> snapshotVersions, Map<IndexId, List<SnapshotId>> indexSnapshots,
|
Map<String, Version> snapshotVersions, Map<IndexId, List<SnapshotId>> indexSnapshots,
|
||||||
ShardGenerations shardGenerations) {
|
ShardGenerations shardGenerations, IndexMetaDataGenerations indexMetaDataGenerations) {
|
||||||
this.genId = genId;
|
this.genId = genId;
|
||||||
this.snapshotIds = Collections.unmodifiableMap(snapshotIds);
|
this.snapshotIds = Collections.unmodifiableMap(snapshotIds);
|
||||||
this.snapshotStates = Collections.unmodifiableMap(snapshotStates);
|
this.snapshotStates = Collections.unmodifiableMap(snapshotStates);
|
||||||
|
@ -108,6 +114,7 @@ public final class RepositoryData {
|
||||||
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
|
.collect(Collectors.toMap(IndexId::getName, Function.identity())));
|
||||||
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
|
this.indexSnapshots = Collections.unmodifiableMap(indexSnapshots);
|
||||||
this.shardGenerations = Objects.requireNonNull(shardGenerations);
|
this.shardGenerations = Objects.requireNonNull(shardGenerations);
|
||||||
|
this.indexMetaDataGenerations = indexMetaDataGenerations;
|
||||||
this.snapshotVersions = snapshotVersions;
|
this.snapshotVersions = snapshotVersions;
|
||||||
assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
|
assert indices.values().containsAll(shardGenerations.indices()) : "ShardGenerations contained indices "
|
||||||
+ shardGenerations.indices() + " but snapshots only reference indices " + indices.values();
|
+ shardGenerations.indices() + " but snapshots only reference indices " + indices.values();
|
||||||
|
@ -116,7 +123,8 @@ public final class RepositoryData {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected RepositoryData copy() {
|
protected RepositoryData copy() {
|
||||||
return new RepositoryData(genId, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
|
return new RepositoryData(
|
||||||
|
genId, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -130,7 +138,8 @@ public final class RepositoryData {
|
||||||
}
|
}
|
||||||
final Map<String, Version> newVersions = new HashMap<>(snapshotVersions);
|
final Map<String, Version> newVersions = new HashMap<>(snapshotVersions);
|
||||||
versions.forEach((id, version) -> newVersions.put(id.getUUID(), version));
|
versions.forEach((id, version) -> newVersions.put(id.getUUID(), version));
|
||||||
return new RepositoryData(genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations);
|
return new RepositoryData(
|
||||||
|
genId, snapshotIds, snapshotStates, newVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ShardGenerations shardGenerations() {
|
public ShardGenerations shardGenerations() {
|
||||||
|
@ -198,6 +207,32 @@ public final class RepositoryData {
|
||||||
}).map(Map.Entry::getKey).collect(Collectors.toList());
|
}).map(Map.Entry::getKey).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a map of {@link IndexId} to a collection of {@link String} containing all the {@link IndexId} and the
|
||||||
|
* {@link org.elasticsearch.cluster.metadata.IndexMetadata} blob name in it that can be removed after removing the given snapshot from
|
||||||
|
* the repository.
|
||||||
|
* NOTE: Does not return a mapping for {@link IndexId} values that will be removed completely from the repository.
|
||||||
|
*
|
||||||
|
* @param snapshotIds SnapshotIds to remove
|
||||||
|
* @return map of index to index metadata blob id to delete
|
||||||
|
*/
|
||||||
|
public Map<IndexId, Collection<String>> indexMetaDataToRemoveAfterRemovingSnapshots(Collection<SnapshotId> snapshotIds) {
|
||||||
|
Collection<IndexId> indicesForSnapshot = indicesToUpdateAfterRemovingSnapshot(snapshotIds);
|
||||||
|
final Set<String> allRemainingIdentifiers = indexMetaDataGenerations.lookup.entrySet().stream()
|
||||||
|
.filter(e -> snapshotIds.contains(e.getKey()) == false).flatMap(e -> e.getValue().values().stream())
|
||||||
|
.map(indexMetaDataGenerations::getIndexMetaBlobId).collect(Collectors.toSet());
|
||||||
|
final Map<IndexId, Collection<String>> toRemove = new HashMap<>();
|
||||||
|
for (IndexId indexId : indicesForSnapshot) {
|
||||||
|
for (SnapshotId snapshotId : snapshotIds) {
|
||||||
|
final String identifier = indexMetaDataGenerations.indexMetaBlobId(snapshotId, indexId);
|
||||||
|
if (allRemainingIdentifiers.contains(identifier) == false) {
|
||||||
|
toRemove.computeIfAbsent(indexId, k -> new HashSet<>()).add(identifier);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return toRemove;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a snapshot and its indices to the repository; returns a new instance. If the snapshot
|
* Add a snapshot and its indices to the repository; returns a new instance. If the snapshot
|
||||||
* already exists in the repository data, this method throws an IllegalArgumentException.
|
* already exists in the repository data, this method throws an IllegalArgumentException.
|
||||||
|
@ -206,11 +241,16 @@ public final class RepositoryData {
|
||||||
* @param snapshotState State of the new snapshot
|
* @param snapshotState State of the new snapshot
|
||||||
* @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new
|
* @param shardGenerations Updated shard generations in the new snapshot. For each index contained in the snapshot an array of new
|
||||||
* generations indexed by the shard id they correspond to must be supplied.
|
* generations indexed by the shard id they correspond to must be supplied.
|
||||||
|
* @param indexMetaBlobs Map of index metadata blob uuids
|
||||||
|
* @param newIdentifiers Map of new index metadata blob uuids keyed by the identifiers of the
|
||||||
|
* {@link org.elasticsearch.cluster.metadata.IndexMetadata} in them
|
||||||
*/
|
*/
|
||||||
public RepositoryData addSnapshot(final SnapshotId snapshotId,
|
public RepositoryData addSnapshot(final SnapshotId snapshotId,
|
||||||
final SnapshotState snapshotState,
|
final SnapshotState snapshotState,
|
||||||
final Version version,
|
final Version version,
|
||||||
final ShardGenerations shardGenerations) {
|
final ShardGenerations shardGenerations,
|
||||||
|
@Nullable final Map<IndexId, String> indexMetaBlobs,
|
||||||
|
@Nullable final Map<String, String> newIdentifiers) {
|
||||||
if (snapshotIds.containsKey(snapshotId.getUUID())) {
|
if (snapshotIds.containsKey(snapshotId.getUUID())) {
|
||||||
// if the snapshot id already exists in the repository data, it means an old master
|
// if the snapshot id already exists in the repository data, it means an old master
|
||||||
// that is blocked from the cluster is trying to finalize a snapshot concurrently with
|
// that is blocked from the cluster is trying to finalize a snapshot concurrently with
|
||||||
|
@ -235,8 +275,23 @@ public final class RepositoryData {
|
||||||
allIndexSnapshots.put(indexId, Collections.unmodifiableList(copy));
|
allIndexSnapshots.put(indexId, Collections.unmodifiableList(copy));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
final IndexMetaDataGenerations newIndexMetaGenerations;
|
||||||
|
if (indexMetaBlobs == null) {
|
||||||
|
assert newIdentifiers == null : "Non-null new identifiers [" + newIdentifiers + "] for null lookup";
|
||||||
|
assert indexMetaDataGenerations.lookup.isEmpty() :
|
||||||
|
"Index meta generations should have been empty but was [" + indexMetaDataGenerations + "]";
|
||||||
|
newIndexMetaGenerations = IndexMetaDataGenerations.EMPTY;
|
||||||
|
} else {
|
||||||
|
assert indexMetaBlobs.isEmpty() || shardGenerations.indices().equals(indexMetaBlobs.keySet()) :
|
||||||
|
"Shard generations contained indices " + shardGenerations.indices()
|
||||||
|
+ " but indexMetaData was given for " + indexMetaBlobs.keySet();
|
||||||
|
newIndexMetaGenerations = indexMetaDataGenerations.withAddedSnapshot(snapshotId, indexMetaBlobs, newIdentifiers);
|
||||||
|
}
|
||||||
|
|
||||||
return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots,
|
return new RepositoryData(genId, snapshots, newSnapshotStates, newSnapshotVersions, allIndexSnapshots,
|
||||||
ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build());
|
ShardGenerations.builder().putAll(this.shardGenerations).putAll(shardGenerations).build(),
|
||||||
|
newIndexMetaGenerations);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -249,7 +304,8 @@ public final class RepositoryData {
|
||||||
if (newGeneration == genId) {
|
if (newGeneration == genId) {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
return new RepositoryData(newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations);
|
return new RepositoryData(
|
||||||
|
newGeneration, snapshotIds, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations, indexMetaDataGenerations);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -291,7 +347,8 @@ public final class RepositoryData {
|
||||||
|
|
||||||
return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots,
|
return new RepositoryData(genId, newSnapshotIds, newSnapshotStates, newSnapshotVersions, indexSnapshots,
|
||||||
ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations)
|
ShardGenerations.builder().putAll(shardGenerations).putAll(updatedShardGenerations)
|
||||||
.retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build()
|
.retainIndicesAndPruneDeletes(indexSnapshots.keySet()).build(),
|
||||||
|
indexMetaDataGenerations.withRemovedSnapshots(snapshots)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,12 +377,14 @@ public final class RepositoryData {
|
||||||
&& snapshotVersions.equals(that.snapshotVersions)
|
&& snapshotVersions.equals(that.snapshotVersions)
|
||||||
&& indices.equals(that.indices)
|
&& indices.equals(that.indices)
|
||||||
&& indexSnapshots.equals(that.indexSnapshots)
|
&& indexSnapshots.equals(that.indexSnapshots)
|
||||||
&& shardGenerations.equals(that.shardGenerations);
|
&& shardGenerations.equals(that.shardGenerations)
|
||||||
|
&& indexMetaDataGenerations.equals(that.indexMetaDataGenerations);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return Objects.hash(snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations);
|
return Objects.hash(
|
||||||
|
snapshotIds, snapshotStates, snapshotVersions, indices, indexSnapshots, shardGenerations, indexMetaDataGenerations);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -366,6 +425,8 @@ public final class RepositoryData {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final String SHARD_GENERATIONS = "shard_generations";
|
private static final String SHARD_GENERATIONS = "shard_generations";
|
||||||
|
private static final String INDEX_METADATA_IDENTIFIERS = "index_metadata_identifiers";
|
||||||
|
private static final String INDEX_METADATA_LOOKUP = "index_metadata_lookup";
|
||||||
private static final String SNAPSHOTS = "snapshots";
|
private static final String SNAPSHOTS = "snapshots";
|
||||||
private static final String INDICES = "indices";
|
private static final String INDICES = "indices";
|
||||||
private static final String INDEX_ID = "id";
|
private static final String INDEX_ID = "id";
|
||||||
|
@ -378,10 +439,12 @@ public final class RepositoryData {
|
||||||
/**
|
/**
|
||||||
* Writes the snapshots metadata and the related indices metadata to x-content.
|
* Writes the snapshots metadata and the related indices metadata to x-content.
|
||||||
*/
|
*/
|
||||||
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final boolean shouldWriteShardGens) throws IOException {
|
public XContentBuilder snapshotsToXContent(final XContentBuilder builder, final Version repoMetaVersion) throws IOException {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
// write the snapshots list
|
// write the snapshots list
|
||||||
builder.startArray(SNAPSHOTS);
|
builder.startArray(SNAPSHOTS);
|
||||||
|
final boolean shouldWriteIndexGens = SnapshotsService.useIndexGenerations(repoMetaVersion);
|
||||||
|
final boolean shouldWriteShardGens = SnapshotsService.useShardGenerations(repoMetaVersion);
|
||||||
for (final SnapshotId snapshot : getSnapshotIds()) {
|
for (final SnapshotId snapshot : getSnapshotIds()) {
|
||||||
builder.startObject();
|
builder.startObject();
|
||||||
builder.field(NAME, snapshot.getName());
|
builder.field(NAME, snapshot.getName());
|
||||||
|
@ -389,6 +452,10 @@ public final class RepositoryData {
|
||||||
if (snapshotStates.containsKey(snapshot.getUUID())) {
|
if (snapshotStates.containsKey(snapshot.getUUID())) {
|
||||||
builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value());
|
builder.field(STATE, snapshotStates.get(snapshot.getUUID()).value());
|
||||||
}
|
}
|
||||||
|
if (shouldWriteIndexGens) {
|
||||||
|
builder.field(INDEX_METADATA_LOOKUP, indexMetaDataGenerations.lookup.getOrDefault(snapshot, Collections.emptyMap())
|
||||||
|
.entrySet().stream().collect(Collectors.toMap(entry -> entry.getKey().getId(), Map.Entry::getValue)));
|
||||||
|
}
|
||||||
if (snapshotVersions.containsKey(snapshot.getUUID())) {
|
if (snapshotVersions.containsKey(snapshot.getUUID())) {
|
||||||
builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString());
|
builder.field(VERSION, snapshotVersions.get(snapshot.getUUID()).toString());
|
||||||
}
|
}
|
||||||
|
@ -417,7 +484,10 @@ public final class RepositoryData {
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
}
|
}
|
||||||
builder.endObject();
|
builder.endObject();
|
||||||
if (shouldWriteShardGens) {
|
if (shouldWriteIndexGens) {
|
||||||
|
builder.field(MIN_VERSION, SnapshotsService.INDEX_GEN_IN_REPO_DATA_VERSION.toString());
|
||||||
|
builder.field(INDEX_METADATA_IDENTIFIERS, indexMetaDataGenerations.identifiers);
|
||||||
|
} else if (shouldWriteShardGens) {
|
||||||
// Add min version field to make it impossible for older ES versions to deserialize this object
|
// Add min version field to make it impossible for older ES versions to deserialize this object
|
||||||
builder.field(MIN_VERSION, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.toString());
|
builder.field(MIN_VERSION, SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION.toString());
|
||||||
}
|
}
|
||||||
|
@ -425,6 +495,10 @@ public final class RepositoryData {
|
||||||
return builder;
|
return builder;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IndexMetaDataGenerations indexMetaDataGenerations() {
|
||||||
|
return indexMetaDataGenerations;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads an instance of {@link RepositoryData} from x-content, loading the snapshots and indices metadata.
|
* Reads an instance of {@link RepositoryData} from x-content, loading the snapshots and indices metadata.
|
||||||
*
|
*
|
||||||
|
@ -438,6 +512,8 @@ public final class RepositoryData {
|
||||||
final Map<String, Version> snapshotVersions = new HashMap<>();
|
final Map<String, Version> snapshotVersions = new HashMap<>();
|
||||||
final Map<IndexId, List<SnapshotId>> indexSnapshots = new HashMap<>();
|
final Map<IndexId, List<SnapshotId>> indexSnapshots = new HashMap<>();
|
||||||
final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();
|
final ShardGenerations.Builder shardGenerations = ShardGenerations.builder();
|
||||||
|
final Map<String, String> indexMetaIdentifiers = new HashMap<>();
|
||||||
|
final Map<SnapshotId, Map<String, String>> indexMetaLookup = new HashMap<>();
|
||||||
|
|
||||||
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
|
if (parser.nextToken() == XContentParser.Token.START_OBJECT) {
|
||||||
while (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
|
while (parser.nextToken() == XContentParser.Token.FIELD_NAME) {
|
||||||
|
@ -448,6 +524,7 @@ public final class RepositoryData {
|
||||||
String name = null;
|
String name = null;
|
||||||
String uuid = null;
|
String uuid = null;
|
||||||
SnapshotState state = null;
|
SnapshotState state = null;
|
||||||
|
Map<String, String> metaGenerations = new HashMap<>();
|
||||||
Version version = null;
|
Version version = null;
|
||||||
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
|
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
|
||||||
String currentFieldName = parser.currentName();
|
String currentFieldName = parser.currentName();
|
||||||
|
@ -458,6 +535,8 @@ public final class RepositoryData {
|
||||||
uuid = parser.text();
|
uuid = parser.text();
|
||||||
} else if (STATE.equals(currentFieldName)) {
|
} else if (STATE.equals(currentFieldName)) {
|
||||||
state = SnapshotState.fromValue(parser.numberValue().byteValue());
|
state = SnapshotState.fromValue(parser.numberValue().byteValue());
|
||||||
|
} else if (INDEX_METADATA_LOOKUP.equals(currentFieldName)) {
|
||||||
|
metaGenerations.putAll(parser.mapStrings());
|
||||||
} else if (VERSION.equals(currentFieldName)) {
|
} else if (VERSION.equals(currentFieldName)) {
|
||||||
version = Version.fromString(parser.text());
|
version = Version.fromString(parser.text());
|
||||||
}
|
}
|
||||||
|
@ -470,6 +549,9 @@ public final class RepositoryData {
|
||||||
snapshotVersions.put(uuid, version);
|
snapshotVersions.put(uuid, version);
|
||||||
}
|
}
|
||||||
snapshots.put(snapshotId.getUUID(), snapshotId);
|
snapshots.put(snapshotId.getUUID(), snapshotId);
|
||||||
|
if (metaGenerations.isEmpty() == false) {
|
||||||
|
indexMetaLookup.put(snapshotId, metaGenerations);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new ElasticsearchParseException("expected array for [" + field + "]");
|
throw new ElasticsearchParseException("expected array for [" + field + "]");
|
||||||
|
@ -545,6 +627,11 @@ public final class RepositoryData {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else if (INDEX_METADATA_IDENTIFIERS.equals(field)) {
|
||||||
|
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
|
||||||
|
throw new ElasticsearchParseException("start object expected [" + INDEX_METADATA_IDENTIFIERS + "]");
|
||||||
|
}
|
||||||
|
indexMetaIdentifiers.putAll(parser.mapStrings());
|
||||||
} else if (MIN_VERSION.equals(field)) {
|
} else if (MIN_VERSION.equals(field)) {
|
||||||
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
|
if (parser.nextToken() != XContentParser.Token.VALUE_STRING) {
|
||||||
throw new ElasticsearchParseException("version string expected [min_version]");
|
throw new ElasticsearchParseException("version string expected [min_version]");
|
||||||
|
@ -558,7 +645,12 @@ public final class RepositoryData {
|
||||||
} else {
|
} else {
|
||||||
throw new ElasticsearchParseException("start object expected");
|
throw new ElasticsearchParseException("start object expected");
|
||||||
}
|
}
|
||||||
return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build());
|
final Map<String, IndexId> indexLookup =
|
||||||
|
indexSnapshots.keySet().stream().collect(Collectors.toMap(IndexId::getId, Function.identity()));
|
||||||
|
return new RepositoryData(genId, snapshots, snapshotStates, snapshotVersions, indexSnapshots, shardGenerations.build(),
|
||||||
|
new IndexMetaDataGenerations(indexMetaLookup.entrySet().stream().collect(
|
||||||
|
Collectors.toMap(Map.Entry::getKey, e -> e.getValue().entrySet().stream()
|
||||||
|
.collect(Collectors.toMap(entry -> indexLookup.get(entry.getKey()), Map.Entry::getValue)))), indexMetaIdentifiers));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,6 +79,7 @@ import org.elasticsearch.common.unit.ByteSizeUnit;
|
||||||
import org.elasticsearch.common.unit.ByteSizeValue;
|
import org.elasticsearch.common.unit.ByteSizeValue;
|
||||||
import org.elasticsearch.common.unit.TimeValue;
|
import org.elasticsearch.common.unit.TimeValue;
|
||||||
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
|
||||||
|
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
|
||||||
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
|
||||||
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
|
@ -100,6 +101,7 @@ import org.elasticsearch.index.store.StoreFileMetadata;
|
||||||
import org.elasticsearch.indices.recovery.RecoverySettings;
|
import org.elasticsearch.indices.recovery.RecoverySettings;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
|
import org.elasticsearch.repositories.IndexMetaDataGenerations;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
import org.elasticsearch.repositories.RepositoryCleanupResult;
|
import org.elasticsearch.repositories.RepositoryCleanupResult;
|
||||||
import org.elasticsearch.repositories.RepositoryData;
|
import org.elasticsearch.repositories.RepositoryData;
|
||||||
|
@ -579,7 +581,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
// delete an index that was created by another master node after writing this index-N blob.
|
// delete an index that was created by another master node after writing this index-N blob.
|
||||||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
||||||
doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData,
|
doDeleteShardSnapshots(snapshotIds, repositoryStateId, foundIndices, rootBlobs, repositoryData,
|
||||||
SnapshotsService.useShardGenerations(repositoryMetaVersion), listener);
|
repositoryMetaVersion, listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -639,10 +641,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
* @param listener Listener to invoke once finished
|
* @param listener Listener to invoke once finished
|
||||||
*/
|
*/
|
||||||
private void doDeleteShardSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Map<String, BlobContainer> foundIndices,
|
private void doDeleteShardSnapshots(Collection<SnapshotId> snapshotIds, long repositoryStateId, Map<String, BlobContainer> foundIndices,
|
||||||
Map<String, BlobMetadata> rootBlobs, RepositoryData repositoryData, boolean writeShardGens,
|
Map<String, BlobMetadata> rootBlobs, RepositoryData repositoryData, Version repoMetaVersion,
|
||||||
ActionListener<Void> listener) {
|
ActionListener<Void> listener) {
|
||||||
|
|
||||||
if (writeShardGens) {
|
if (SnapshotsService.useShardGenerations(repoMetaVersion)) {
|
||||||
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
|
// First write the new shard state metadata (with the removed snapshot) and compute deletion targets
|
||||||
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
|
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeShardMetaDataAndComputeDeletesStep = new StepListener<>();
|
||||||
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep);
|
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, true, writeShardMetaDataAndComputeDeletesStep);
|
||||||
|
@ -660,7 +662,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
|
builder.put(newGen.indexId, newGen.shardId, newGen.newGeneration);
|
||||||
}
|
}
|
||||||
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build());
|
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, builder.build());
|
||||||
writeIndexGen(updatedRepoData, repositoryStateId, true, Function.identity(),
|
writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(),
|
||||||
ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure));
|
ActionListener.wrap(v -> writeUpdatedRepoDataStep.onResponse(updatedRepoData), listener::onFailure));
|
||||||
}, listener::onFailure);
|
}, listener::onFailure);
|
||||||
// Once we have updated the repository, run the clean-ups
|
// Once we have updated the repository, run the clean-ups
|
||||||
|
@ -669,12 +671,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
final ActionListener<Void> afterCleanupsListener =
|
final ActionListener<Void> afterCleanupsListener =
|
||||||
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
|
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
|
||||||
asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
|
asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
|
||||||
asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(), afterCleanupsListener);
|
asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(),
|
||||||
|
afterCleanupsListener);
|
||||||
}, listener::onFailure);
|
}, listener::onFailure);
|
||||||
} else {
|
} else {
|
||||||
// Write the new repository data first (with the removed snapshot), using no shard generations
|
// Write the new repository data first (with the removed snapshot), using no shard generations
|
||||||
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
|
final RepositoryData updatedRepoData = repositoryData.removeSnapshots(snapshotIds, ShardGenerations.EMPTY);
|
||||||
writeIndexGen(updatedRepoData, repositoryStateId, false, Function.identity(), ActionListener.wrap(v -> {
|
writeIndexGen(updatedRepoData, repositoryStateId, repoMetaVersion, Function.identity(), ActionListener.wrap(v -> {
|
||||||
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
||||||
final ActionListener<Void> afterCleanupsListener =
|
final ActionListener<Void> afterCleanupsListener =
|
||||||
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
|
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
|
||||||
|
@ -682,7 +685,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
|
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
|
||||||
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep);
|
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep);
|
||||||
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
|
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
|
||||||
asyncCleanupUnlinkedShardLevelBlobs(snapshotIds, deleteResults, afterCleanupsListener),
|
asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, deleteResults, afterCleanupsListener),
|
||||||
afterCleanupsListener::onFailure);
|
afterCleanupsListener::onFailure);
|
||||||
}, listener::onFailure));
|
}, listener::onFailure));
|
||||||
}
|
}
|
||||||
|
@ -696,14 +699,14 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
l -> cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null))));
|
l -> cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null))));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void asyncCleanupUnlinkedShardLevelBlobs(Collection<SnapshotId> snapshotIds,
|
private void asyncCleanupUnlinkedShardLevelBlobs(RepositoryData oldRepositoryData, Collection<SnapshotId> snapshotIds,
|
||||||
Collection<ShardSnapshotMetaDeleteResult> deleteResults,
|
Collection<ShardSnapshotMetaDeleteResult> deleteResults,
|
||||||
ActionListener<Void> listener) {
|
ActionListener<Void> listener) {
|
||||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
|
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
|
||||||
listener,
|
listener,
|
||||||
l -> {
|
l -> {
|
||||||
try {
|
try {
|
||||||
deleteFromContainer(blobContainer(), resolveFilesToDelete(snapshotIds, deleteResults));
|
deleteFromContainer(blobContainer(), resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults));
|
||||||
l.onResponse(null);
|
l.onResponse(null);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
|
@ -735,14 +738,18 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
final Set<SnapshotId> survivingSnapshots = oldRepositoryData.getSnapshots(indexId).stream()
|
final Set<SnapshotId> survivingSnapshots = oldRepositoryData.getSnapshots(indexId).stream()
|
||||||
.filter(id -> snapshotIds.contains(id) == false).collect(Collectors.toSet());
|
.filter(id -> snapshotIds.contains(id) == false).collect(Collectors.toSet());
|
||||||
final StepListener<Collection<Integer>> shardCountListener = new StepListener<>();
|
final StepListener<Collection<Integer>> shardCountListener = new StepListener<>();
|
||||||
final ActionListener<Integer> allShardCountsListener = new GroupedActionListener<>(shardCountListener, snapshotIds.size());
|
final Collection<String> indexMetaGenerations = snapshotIds.stream().map(
|
||||||
for (SnapshotId snapshotId : snapshotIds) {
|
id -> oldRepositoryData.indexMetaDataGenerations().indexMetaBlobId(id, indexId)).collect(Collectors.toSet());
|
||||||
|
final ActionListener<Integer> allShardCountsListener =
|
||||||
|
new GroupedActionListener<>(shardCountListener, indexMetaGenerations.size());
|
||||||
|
final BlobContainer indexContainer = indexContainer(indexId);
|
||||||
|
for (String indexMetaGeneration : indexMetaGenerations) {
|
||||||
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
|
executor.execute(ActionRunnable.supply(allShardCountsListener, () -> {
|
||||||
try {
|
try {
|
||||||
return getSnapshotIndexMetadata(snapshotId, indexId).getNumberOfShards();
|
return indexMetadataFormat.read(indexContainer, indexMetaGeneration).getNumberOfShards();
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
logger.warn(() -> new ParameterizedMessage(
|
logger.warn(() -> new ParameterizedMessage(
|
||||||
"[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
|
"[{}] [{}] failed to read metadata for index", indexMetaGeneration, indexId.getName()), ex);
|
||||||
// Just invoke the listener without any shard generations to count it down, this index will be cleaned up
|
// Just invoke the listener without any shard generations to count it down, this index will be cleaned up
|
||||||
// by the stale data cleanup in the end.
|
// by the stale data cleanup in the end.
|
||||||
// TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just
|
// TODO: Getting here means repository corruption. We should find a way of dealing with this instead of just
|
||||||
|
@ -797,19 +804,21 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> resolveFilesToDelete(Collection<SnapshotId> snapshotIds,
|
private List<String> resolveFilesToDelete(RepositoryData oldRepositoryData, Collection<SnapshotId> snapshotIds,
|
||||||
Collection<ShardSnapshotMetaDeleteResult> deleteResults) {
|
Collection<ShardSnapshotMetaDeleteResult> deleteResults) {
|
||||||
final String basePath = basePath().buildAsString();
|
final String basePath = basePath().buildAsString();
|
||||||
final int basePathLen = basePath.length();
|
final int basePathLen = basePath.length();
|
||||||
|
final Map<IndexId, Collection<String>> indexMetaGenerations =
|
||||||
|
oldRepositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(snapshotIds);
|
||||||
return Stream.concat(
|
return Stream.concat(
|
||||||
deleteResults.stream().flatMap(shardResult -> {
|
deleteResults.stream().flatMap(shardResult -> {
|
||||||
final String shardPath =
|
final String shardPath =
|
||||||
shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
|
shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
|
||||||
return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
|
return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
|
||||||
}),
|
}),
|
||||||
deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().flatMap(indexId -> {
|
indexMetaGenerations.entrySet().stream().flatMap(entry -> {
|
||||||
final String indexContainerPath = indexContainer(indexId).path().buildAsString();
|
final String indexContainerPath = indexContainer(entry.getKey()).path().buildAsString();
|
||||||
return snapshotIds.stream().map(snapshotId -> indexContainerPath + globalMetadataFormat.blobName(snapshotId.getUUID()));
|
return entry.getValue().stream().map(id -> indexContainerPath + indexMetadataFormat.blobName(id));
|
||||||
})
|
})
|
||||||
).map(absolutePath -> {
|
).map(absolutePath -> {
|
||||||
assert absolutePath.startsWith(basePath);
|
assert absolutePath.startsWith(basePath);
|
||||||
|
@ -854,6 +863,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
* Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the
|
* Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the
|
||||||
* repository.
|
* repository.
|
||||||
* TODO: Add shard level cleanups
|
* TODO: Add shard level cleanups
|
||||||
|
* TODO: Add unreferenced index metadata cleanup
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>Deleting stale indices {@link #cleanupStaleIndices}</li>
|
* <li>Deleting stale indices {@link #cleanupStaleIndices}</li>
|
||||||
* <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
|
* <li>Deleting unreferenced root level blobs {@link #cleanupStaleRootFiles}</li>
|
||||||
|
@ -878,7 +888,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO));
|
listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO));
|
||||||
} else {
|
} else {
|
||||||
// write new index-N blob to ensure concurrent operations will fail
|
// write new index-N blob to ensure concurrent operations will fail
|
||||||
writeIndexGen(repositoryData, repositoryStateId, SnapshotsService.useShardGenerations(repositoryMetaVersion),
|
writeIndexGen(repositoryData, repositoryStateId, repositoryMetaVersion,
|
||||||
Function.identity(), ActionListener.wrap(v -> cleanupStaleBlobs(Collections.emptyList(), foundIndices, rootBlobs,
|
Function.identity(), ActionListener.wrap(v -> cleanupStaleBlobs(Collections.emptyList(), foundIndices, rootBlobs,
|
||||||
repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure));
|
repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new)), listener::onFailure));
|
||||||
}
|
}
|
||||||
|
@ -1002,23 +1012,40 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
|
final boolean writeShardGens = SnapshotsService.useShardGenerations(repositoryMetaVersion);
|
||||||
final Consumer<Exception> onUpdateFailure =
|
final Consumer<Exception> onUpdateFailure =
|
||||||
e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e));
|
e -> listener.onFailure(new SnapshotException(metadata.name(), snapshotId, "failed to update snapshot in repository", e));
|
||||||
|
|
||||||
|
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
||||||
|
|
||||||
|
final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion);
|
||||||
|
|
||||||
|
final StepListener<RepositoryData> repoDataListener = new StepListener<>();
|
||||||
|
executor.execute(ActionRunnable.wrap(repoDataListener, this::getRepositoryData));
|
||||||
|
repoDataListener.whenComplete(existingRepositoryData -> {
|
||||||
|
|
||||||
|
final Map<IndexId, String> indexMetas;
|
||||||
|
final Map<String, String> indexMetaIdentifiers;
|
||||||
|
if (writeIndexGens) {
|
||||||
|
indexMetaIdentifiers = ConcurrentCollections.newConcurrentMap();
|
||||||
|
indexMetas = ConcurrentCollections.newConcurrentMap();
|
||||||
|
} else {
|
||||||
|
indexMetas = null;
|
||||||
|
indexMetaIdentifiers = null;
|
||||||
|
}
|
||||||
|
|
||||||
final ActionListener<SnapshotInfo> allMetaListener = new GroupedActionListener<>(
|
final ActionListener<SnapshotInfo> allMetaListener = new GroupedActionListener<>(
|
||||||
ActionListener.wrap(snapshotInfos -> {
|
ActionListener.wrap(snapshotInfos -> {
|
||||||
assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
|
assert snapshotInfos.size() == 1 : "Should have only received a single SnapshotInfo but received " + snapshotInfos;
|
||||||
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
|
final SnapshotInfo snapshotInfo = snapshotInfos.iterator().next();
|
||||||
getRepositoryData(ActionListener.wrap(existingRepositoryData -> {
|
final RepositoryData updatedRepositoryData = existingRepositoryData.addSnapshot(
|
||||||
final RepositoryData updatedRepositoryData =
|
snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations, indexMetas, indexMetaIdentifiers);
|
||||||
existingRepositoryData.addSnapshot(snapshotId, snapshotInfo.state(), Version.CURRENT, shardGenerations);
|
writeIndexGen(updatedRepositoryData, repositoryStateId, repositoryMetaVersion, stateTransformer,
|
||||||
writeIndexGen(updatedRepositoryData, repositoryStateId, writeShardGens, stateTransformer,
|
ActionListener.wrap(
|
||||||
ActionListener.wrap(writtenRepoData -> {
|
newRepoData -> {
|
||||||
if (writeShardGens) {
|
if (writeShardGens) {
|
||||||
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
|
cleanupOldShardGens(existingRepositoryData, updatedRepositoryData);
|
||||||
}
|
}
|
||||||
listener.onResponse(new Tuple<>(writtenRepoData, snapshotInfo));
|
listener.onResponse(new Tuple<>(newRepoData, snapshotInfo));
|
||||||
}, onUpdateFailure));
|
|
||||||
}, onUpdateFailure));
|
}, onUpdateFailure));
|
||||||
}, onUpdateFailure), 2 + indices.size());
|
}, onUpdateFailure), 2 + indices.size());
|
||||||
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
|
||||||
|
|
||||||
// We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
|
// We ignore all FileAlreadyExistsException when writing metadata since otherwise a master failover while in this method will
|
||||||
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
|
// mean that no snap-${uuid}.dat blob is ever written for this snapshot. This is safe because any updated version of the
|
||||||
|
@ -1026,16 +1053,31 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
|
// Failing on an already existing index-${repoGeneration} below ensures that the index.latest blob is not updated in a way
|
||||||
// that decrements the generation it points at
|
// that decrements the generation it points at
|
||||||
|
|
||||||
// Write Global Metadata
|
// Write Global MetaData
|
||||||
executor.execute(ActionRunnable.run(allMetaListener,
|
executor.execute(ActionRunnable.run(allMetaListener,
|
||||||
() -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false)));
|
() -> globalMetadataFormat.write(clusterMetadata, blobContainer(), snapshotId.getUUID(), false)));
|
||||||
|
|
||||||
// write the index metadata for each index in the snapshot
|
// write the index metadata for each index in the snapshot
|
||||||
for (IndexId index : indices) {
|
for (IndexId index : indices) {
|
||||||
executor.execute(ActionRunnable.run(allMetaListener, () ->
|
executor.execute(ActionRunnable.run(allMetaListener, () -> {
|
||||||
indexMetadataFormat.write(clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false)));
|
final IndexMetadata indexMetaData = clusterMetadata.index(index.getName());
|
||||||
|
if (writeIndexGens) {
|
||||||
|
final String identifiers = IndexMetaDataGenerations.buildUniqueIdentifier(indexMetaData);
|
||||||
|
String metaUUID = existingRepositoryData.indexMetaDataGenerations().getIndexMetaBlobId(identifiers);
|
||||||
|
if (metaUUID == null) {
|
||||||
|
// We don't yet have this version of the metadata so we write it
|
||||||
|
metaUUID = UUIDs.base64UUID();
|
||||||
|
indexMetadataFormat.write(indexMetaData, indexContainer(index), metaUUID, false);
|
||||||
|
indexMetaIdentifiers.put(identifiers, metaUUID);
|
||||||
|
}
|
||||||
|
indexMetas.put(index, identifiers);
|
||||||
|
} else {
|
||||||
|
indexMetadataFormat.write(
|
||||||
|
clusterMetadata.index(index.getName()), indexContainer(index), snapshotId.getUUID(), false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
executor.execute(ActionRunnable.supply(allMetaListener, () -> {
|
executor.execute(ActionRunnable.supply(allMetaListener, () -> {
|
||||||
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
|
final SnapshotInfo snapshotInfo = new SnapshotInfo(snapshotId,
|
||||||
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
|
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
|
||||||
|
@ -1045,6 +1087,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false);
|
snapshotFormat.write(snapshotInfo, blobContainer(), snapshotId.getUUID(), false);
|
||||||
return snapshotInfo;
|
return snapshotInfo;
|
||||||
}));
|
}));
|
||||||
|
}, onUpdateFailure);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
|
// Delete all old shard gen blobs that aren't referenced any longer as a result from moving to updated repository data
|
||||||
|
@ -1084,9 +1127,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexMetadata getSnapshotIndexMetadata(final SnapshotId snapshotId, final IndexId index) throws IOException {
|
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) throws IOException {
|
||||||
try {
|
try {
|
||||||
return indexMetadataFormat.read(indexContainer(index), snapshotId.getUUID());
|
return indexMetadataFormat.read(indexContainer(index),
|
||||||
|
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, index));
|
||||||
} catch (NoSuchFileException e) {
|
} catch (NoSuchFileException e) {
|
||||||
throw new SnapshotMissingException(metadata.name(), snapshotId, e);
|
throw new SnapshotMissingException(metadata.name(), snapshotId, e);
|
||||||
}
|
}
|
||||||
|
@ -1245,7 +1289,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
loaded = getRepositoryData(genToLoad);
|
loaded = getRepositoryData(genToLoad);
|
||||||
// We can cache in the most recent version here without regard to the actual repository metadata version since we're
|
// We can cache in the most recent version here without regard to the actual repository metadata version since we're
|
||||||
// only caching the information that we just wrote and thus won't accidentally cache any information that isn't safe
|
// only caching the information that we just wrote and thus won't accidentally cache any information that isn't safe
|
||||||
cacheRepositoryData(loaded, true);
|
cacheRepositoryData(loaded, Version.CURRENT);
|
||||||
}
|
}
|
||||||
listener.onResponse(loaded);
|
listener.onResponse(loaded);
|
||||||
return;
|
return;
|
||||||
|
@ -1281,16 +1325,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
* generation will always contain the same {@link RepositoryData}.
|
* generation will always contain the same {@link RepositoryData}.
|
||||||
*
|
*
|
||||||
* @param updated RepositoryData to cache if newer than the cache contents
|
* @param updated RepositoryData to cache if newer than the cache contents
|
||||||
* @param writeShardGens whether to cache shard generation values
|
* @param version version of the repository metadata that was cached
|
||||||
*/
|
*/
|
||||||
private void cacheRepositoryData(RepositoryData updated, boolean writeShardGens) {
|
private void cacheRepositoryData(RepositoryData updated, Version version) {
|
||||||
if (cacheRepositoryData && bestEffortConsistency == false) {
|
if (cacheRepositoryData && bestEffortConsistency == false) {
|
||||||
final BytesReference serialized;
|
final BytesReference serialized;
|
||||||
BytesStreamOutput out = new BytesStreamOutput();
|
BytesStreamOutput out = new BytesStreamOutput();
|
||||||
try {
|
try {
|
||||||
try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out);
|
try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out);
|
||||||
XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) {
|
XContentBuilder builder = XContentFactory.jsonBuilder(tmp)) {
|
||||||
updated.snapshotsToXContent(builder, writeShardGens);
|
updated.snapshotsToXContent(builder, version);
|
||||||
}
|
}
|
||||||
serialized = out.bytes();
|
serialized = out.bytes();
|
||||||
final int len = serialized.length();
|
final int len = serialized.length();
|
||||||
|
@ -1423,11 +1467,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
*
|
*
|
||||||
* @param repositoryData RepositoryData to write
|
* @param repositoryData RepositoryData to write
|
||||||
* @param expectedGen expected repository generation at the start of the operation
|
* @param expectedGen expected repository generation at the start of the operation
|
||||||
* @param writeShardGens whether to write {@link ShardGenerations} to the new {@link RepositoryData} blob
|
* @param version version of the repository metadata to write
|
||||||
* @param stateFilter filter for the last cluster state update executed by this method
|
* @param stateFilter filter for the last cluster state update executed by this method
|
||||||
* @param listener completion listener
|
* @param listener completion listener
|
||||||
*/
|
*/
|
||||||
protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, boolean writeShardGens,
|
protected void writeIndexGen(RepositoryData repositoryData, long expectedGen, Version version,
|
||||||
Function<ClusterState, ClusterState> stateFilter, ActionListener<RepositoryData> listener) {
|
Function<ClusterState, ClusterState> stateFilter, ActionListener<RepositoryData> listener) {
|
||||||
assert isReadOnly() == false; // can not write to a read only repository
|
assert isReadOnly() == false; // can not write to a read only repository
|
||||||
final long currentGen = repositoryData.getGenId();
|
final long currentGen = repositoryData.getGenId();
|
||||||
|
@ -1538,7 +1582,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
|
final String indexBlob = INDEX_FILE_PREFIX + Long.toString(newGen);
|
||||||
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
|
logger.debug("Repository [{}] writing new index generational blob [{}]", metadata.name(), indexBlob);
|
||||||
writeAtomic(indexBlob,
|
writeAtomic(indexBlob,
|
||||||
BytesReference.bytes(filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens)), true);
|
BytesReference.bytes(
|
||||||
|
filteredRepositoryData.snapshotsToXContent(XContentFactory.jsonBuilder(), version)), true);
|
||||||
// write the current generation to the index-latest file
|
// write the current generation to the index-latest file
|
||||||
final BytesReference genBytes;
|
final BytesReference genBytes;
|
||||||
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
|
try (BytesStreamOutput bStream = new BytesStreamOutput()) {
|
||||||
|
@ -1579,7 +1624,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
@Override
|
@Override
|
||||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||||
final RepositoryData writtenRepositoryData = filteredRepositoryData.withGenId(newGen);
|
final RepositoryData writtenRepositoryData = filteredRepositoryData.withGenId(newGen);
|
||||||
cacheRepositoryData(writtenRepositoryData, writeShardGens);
|
cacheRepositoryData(writtenRepositoryData, version);
|
||||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
|
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.supply(listener, () -> {
|
||||||
// Delete all now outdated index files up to 1000 blobs back from the new generation.
|
// Delete all now outdated index files up to 1000 blobs back from the new generation.
|
||||||
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
|
// If there are more than 1000 dangling index-N cleanup functionality on repo delete will take care of them.
|
||||||
|
|
|
@ -246,7 +246,7 @@ public class RestoreService implements ClusterStateApplier {
|
||||||
|
|
||||||
final List<IndexId> indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot);
|
final List<IndexId> indexIdsInSnapshot = repositoryData.resolveIndices(indicesInSnapshot);
|
||||||
for (IndexId indexId : indexIdsInSnapshot) {
|
for (IndexId indexId : indexIdsInSnapshot) {
|
||||||
metadataBuilder.put(repository.getSnapshotIndexMetadata(snapshotId, indexId), false);
|
metadataBuilder.put(repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId), false);
|
||||||
}
|
}
|
||||||
|
|
||||||
final Metadata metadata = metadataBuilder.build();
|
final Metadata metadata = metadataBuilder.build();
|
||||||
|
|
|
@ -122,6 +122,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||||
|
|
||||||
public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0;
|
public static final Version SHARD_GEN_IN_REPO_DATA_VERSION = Version.V_7_6_0;
|
||||||
|
|
||||||
|
public static final Version INDEX_GEN_IN_REPO_DATA_VERSION = Version.V_7_9_0;
|
||||||
|
|
||||||
public static final Version OLD_SNAPSHOT_FORMAT = Version.V_7_5_0;
|
public static final Version OLD_SNAPSHOT_FORMAT = Version.V_7_5_0;
|
||||||
|
|
||||||
public static final Version MULTI_DELETE_VERSION = Version.V_7_8_0;
|
public static final Version MULTI_DELETE_VERSION = Version.V_7_8_0;
|
||||||
|
@ -1505,7 +1507,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes snapshot from repository
|
* Checks whether the metadata version supports writing {@link ShardGenerations} to the repository.
|
||||||
|
*
|
||||||
|
* @param repositoryMetaVersion version to check
|
||||||
|
* @return true if version supports {@link ShardGenerations}
|
||||||
|
*/
|
||||||
|
public static boolean useIndexGenerations(Version repositoryMetaVersion) {
|
||||||
|
return repositoryMetaVersion.onOrAfter(INDEX_GEN_IN_REPO_DATA_VERSION);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Deletes snapshot from repository
|
||||||
*
|
*
|
||||||
* @param repoName repository name
|
* @param repoName repository name
|
||||||
* @param snapshotIds snapshot ids
|
* @param snapshotIds snapshot ids
|
||||||
|
|
|
@ -49,7 +49,6 @@ import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.Transport;
|
import org.elasticsearch.transport.Transport;
|
||||||
import org.elasticsearch.transport.TransportService;
|
import org.elasticsearch.transport.TransportService;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -151,7 +150,7 @@ public class RepositoriesServiceTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException {
|
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -34,6 +34,7 @@ import org.elasticsearch.test.ESTestCase;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -41,6 +42,8 @@ import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
|
import static org.elasticsearch.repositories.RepositoryData.EMPTY_REPO_GEN;
|
||||||
import static org.hamcrest.Matchers.containsInAnyOrder;
|
import static org.hamcrest.Matchers.containsInAnyOrder;
|
||||||
|
@ -74,7 +77,7 @@ public class RepositoryDataTests extends ESTestCase {
|
||||||
public void testXContent() throws IOException {
|
public void testXContent() throws IOException {
|
||||||
RepositoryData repositoryData = generateRandomRepoData();
|
RepositoryData repositoryData = generateRandomRepoData();
|
||||||
XContentBuilder builder = JsonXContent.contentBuilder();
|
XContentBuilder builder = JsonXContent.contentBuilder();
|
||||||
repositoryData.snapshotsToXContent(builder, true);
|
repositoryData.snapshotsToXContent(builder, Version.CURRENT);
|
||||||
try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
|
try (XContentParser parser = createParser(JsonXContent.jsonXContent, BytesReference.bytes(builder))) {
|
||||||
long gen = (long) randomIntBetween(0, 500);
|
long gen = (long) randomIntBetween(0, 500);
|
||||||
RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen, randomBoolean());
|
RepositoryData fromXContent = RepositoryData.snapshotsFromXContent(parser, gen, randomBoolean());
|
||||||
|
@ -106,9 +109,14 @@ public class RepositoryDataTests extends ESTestCase {
|
||||||
indices.add(indexId);
|
indices.add(indexId);
|
||||||
builder.put(indexId, 0, UUIDs.randomBase64UUID(random()));
|
builder.put(indexId, 0, UUIDs.randomBase64UUID(random()));
|
||||||
}
|
}
|
||||||
|
final ShardGenerations shardGenerations = builder.build();
|
||||||
|
final Map<IndexId, String> indexLookup =
|
||||||
|
shardGenerations.indices().stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256)));
|
||||||
RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot,
|
RepositoryData newRepoData = repositoryData.addSnapshot(newSnapshot,
|
||||||
randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED),
|
randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED),
|
||||||
randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build());
|
randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()),
|
||||||
|
shardGenerations, indexLookup,
|
||||||
|
indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random()))));
|
||||||
// verify that the new repository data has the new snapshot and its indices
|
// verify that the new repository data has the new snapshot and its indices
|
||||||
assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot));
|
assertTrue(newRepoData.getSnapshotIds().contains(newSnapshot));
|
||||||
for (IndexId indexId : indices) {
|
for (IndexId indexId : indices) {
|
||||||
|
@ -132,12 +140,12 @@ public class RepositoryDataTests extends ESTestCase {
|
||||||
snapshotStates.put(snapshotId.getUUID(), randomFrom(SnapshotState.values()));
|
snapshotStates.put(snapshotId.getUUID(), randomFrom(SnapshotState.values()));
|
||||||
snapshotVersions.put(snapshotId.getUUID(), randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()));
|
snapshotVersions.put(snapshotId.getUUID(), randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()));
|
||||||
}
|
}
|
||||||
RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds,
|
RepositoryData repositoryData = new RepositoryData(EMPTY_REPO_GEN, snapshotIds, Collections.emptyMap(), Collections.emptyMap(),
|
||||||
Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), ShardGenerations.EMPTY);
|
Collections.emptyMap(), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY);
|
||||||
// test that initializing indices works
|
// test that initializing indices works
|
||||||
Map<IndexId, List<SnapshotId>> indices = randomIndices(snapshotIds);
|
Map<IndexId, List<SnapshotId>> indices = randomIndices(snapshotIds);
|
||||||
RepositoryData newRepoData =
|
RepositoryData newRepoData = new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, snapshotVersions, indices,
|
||||||
new RepositoryData(repositoryData.getGenId(), snapshotIds, snapshotStates, snapshotVersions, indices, ShardGenerations.EMPTY);
|
ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY);
|
||||||
List<SnapshotId> expected = new ArrayList<>(repositoryData.getSnapshotIds());
|
List<SnapshotId> expected = new ArrayList<>(repositoryData.getSnapshotIds());
|
||||||
Collections.sort(expected);
|
Collections.sort(expected);
|
||||||
List<SnapshotId> actual = new ArrayList<>(newRepoData.getSnapshotIds());
|
List<SnapshotId> actual = new ArrayList<>(newRepoData.getSnapshotIds());
|
||||||
|
@ -153,7 +161,8 @@ public class RepositoryDataTests extends ESTestCase {
|
||||||
List<SnapshotId> snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds());
|
List<SnapshotId> snapshotIds = new ArrayList<>(repositoryData.getSnapshotIds());
|
||||||
assertThat(snapshotIds.size(), greaterThan(0));
|
assertThat(snapshotIds.size(), greaterThan(0));
|
||||||
SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1));
|
SnapshotId removedSnapshotId = snapshotIds.remove(randomIntBetween(0, snapshotIds.size() - 1));
|
||||||
RepositoryData newRepositoryData = repositoryData.removeSnapshots(Collections.singleton(removedSnapshotId), ShardGenerations.EMPTY);
|
RepositoryData newRepositoryData =
|
||||||
|
repositoryData.removeSnapshots(Collections.singleton(removedSnapshotId), ShardGenerations.EMPTY);
|
||||||
// make sure the repository data's indices no longer contain the removed snapshot
|
// make sure the repository data's indices no longer contain the removed snapshot
|
||||||
for (final IndexId indexId : newRepositoryData.getIndices().values()) {
|
for (final IndexId indexId : newRepositoryData.getIndices().values()) {
|
||||||
assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId));
|
assertFalse(newRepositoryData.getSnapshots(indexId).contains(removedSnapshotId));
|
||||||
|
@ -173,8 +182,9 @@ public class RepositoryDataTests extends ESTestCase {
|
||||||
public void testGetSnapshotState() {
|
public void testGetSnapshotState() {
|
||||||
final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID());
|
final SnapshotId snapshotId = new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID());
|
||||||
final SnapshotState state = randomFrom(SnapshotState.values());
|
final SnapshotState state = randomFrom(SnapshotState.values());
|
||||||
final RepositoryData repositoryData = RepositoryData.EMPTY.addSnapshot(snapshotId, state,
|
final RepositoryData repositoryData =
|
||||||
randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), ShardGenerations.EMPTY);
|
RepositoryData.EMPTY.addSnapshot(snapshotId, state, randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()),
|
||||||
|
ShardGenerations.EMPTY, Collections.emptyMap(), Collections.emptyMap());
|
||||||
assertEquals(state, repositoryData.getSnapshotState(snapshotId));
|
assertEquals(state, repositoryData.getSnapshotState(snapshotId));
|
||||||
assertNull(repositoryData.getSnapshotState(new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())));
|
assertNull(repositoryData.getSnapshotState(new SnapshotId(randomAlphaOfLength(8), UUIDs.randomBase64UUID())));
|
||||||
}
|
}
|
||||||
|
@ -184,7 +194,7 @@ public class RepositoryDataTests extends ESTestCase {
|
||||||
final RepositoryData repositoryData = generateRandomRepoData();
|
final RepositoryData repositoryData = generateRandomRepoData();
|
||||||
|
|
||||||
XContentBuilder builder = XContentBuilder.builder(xContent);
|
XContentBuilder builder = XContentBuilder.builder(xContent);
|
||||||
repositoryData.snapshotsToXContent(builder, true);
|
repositoryData.snapshotsToXContent(builder, Version.CURRENT);
|
||||||
RepositoryData parsedRepositoryData;
|
RepositoryData parsedRepositoryData;
|
||||||
try (XContentParser xParser = createParser(builder)) {
|
try (XContentParser xParser = createParser(builder)) {
|
||||||
parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId(), randomBoolean());
|
parsedRepositoryData = RepositoryData.snapshotsFromXContent(xParser, repositoryData.getGenId(), randomBoolean());
|
||||||
|
@ -219,10 +229,10 @@ public class RepositoryDataTests extends ESTestCase {
|
||||||
assertNotNull(corruptedIndexId);
|
assertNotNull(corruptedIndexId);
|
||||||
|
|
||||||
RepositoryData corruptedRepositoryData = new RepositoryData(parsedRepositoryData.getGenId(), snapshotIds, snapshotStates,
|
RepositoryData corruptedRepositoryData = new RepositoryData(parsedRepositoryData.getGenId(), snapshotIds, snapshotStates,
|
||||||
snapshotVersions, indexSnapshots, shardGenBuilder.build());
|
snapshotVersions, indexSnapshots, shardGenBuilder.build(), IndexMetaDataGenerations.EMPTY);
|
||||||
|
|
||||||
final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent);
|
final XContentBuilder corruptedBuilder = XContentBuilder.builder(xContent);
|
||||||
corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, true);
|
corruptedRepositoryData.snapshotsToXContent(corruptedBuilder, Version.CURRENT);
|
||||||
|
|
||||||
try (XContentParser xParser = createParser(corruptedBuilder)) {
|
try (XContentParser xParser = createParser(corruptedBuilder)) {
|
||||||
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () ->
|
ElasticsearchParseException e = expectThrows(ElasticsearchParseException.class, () ->
|
||||||
|
@ -269,6 +279,57 @@ public class RepositoryDataTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Test removing snapshot from random data where no two snapshots share any index metadata blobs
|
||||||
|
public void testIndexMetaDataToRemoveAfterRemovingSnapshotNoSharing() {
|
||||||
|
final RepositoryData repositoryData = generateRandomRepoData();
|
||||||
|
final SnapshotId snapshotId = randomFrom(repositoryData.getSnapshotIds());
|
||||||
|
final IndexMetaDataGenerations indexMetaDataGenerations = repositoryData.indexMetaDataGenerations();
|
||||||
|
final Collection<IndexId> indicesToUpdate = repositoryData.indicesToUpdateAfterRemovingSnapshot(Collections.singleton(snapshotId));
|
||||||
|
final Map<IndexId, Collection<String>> identifiersToRemove = indexMetaDataGenerations.lookup.get(snapshotId).entrySet().stream()
|
||||||
|
.filter(e -> indicesToUpdate.contains(e.getKey()))
|
||||||
|
.collect(Collectors.toMap(Map.Entry::getKey,
|
||||||
|
e -> Collections.singleton(indexMetaDataGenerations.getIndexMetaBlobId(e.getValue()))));
|
||||||
|
assertEquals(repositoryData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(snapshotId)), identifiersToRemove);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test removing snapshot from random data that has some or all index metadata shared
|
||||||
|
public void testIndexMetaDataToRemoveAfterRemovingSnapshotWithSharing() {
|
||||||
|
final RepositoryData repositoryData = generateRandomRepoData();
|
||||||
|
final ShardGenerations.Builder builder = ShardGenerations.builder();
|
||||||
|
final SnapshotId otherSnapshotId = randomFrom(repositoryData.getSnapshotIds());
|
||||||
|
final Collection<IndexId> indicesInOther = repositoryData.getIndices().values()
|
||||||
|
.stream()
|
||||||
|
.filter(index -> repositoryData.getSnapshots(index).contains(otherSnapshotId))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
for (IndexId indexId : indicesInOther) {
|
||||||
|
builder.put(indexId, 0, UUIDs.randomBase64UUID(random()));
|
||||||
|
}
|
||||||
|
final Map<IndexId, String> newIndices = new HashMap<>();
|
||||||
|
final Map<String, String> newIdentifiers = new HashMap<>();
|
||||||
|
final Map<IndexId, Collection<String>> removeFromOther = new HashMap<>();
|
||||||
|
for (IndexId indexId : randomSubsetOf(repositoryData.getIndices().values())) {
|
||||||
|
if (indicesInOther.contains(indexId)) {
|
||||||
|
removeFromOther.put(indexId, Collections.singleton(
|
||||||
|
repositoryData.indexMetaDataGenerations().indexMetaBlobId(otherSnapshotId, indexId)));
|
||||||
|
}
|
||||||
|
final String identifier = randomAlphaOfLength(20);
|
||||||
|
newIndices.put(indexId, identifier);
|
||||||
|
newIdentifiers.put(identifier, UUIDs.randomBase64UUID(random()));
|
||||||
|
builder.put(indexId, 0, UUIDs.randomBase64UUID(random()));
|
||||||
|
}
|
||||||
|
final ShardGenerations shardGenerations = builder.build();
|
||||||
|
final Map<IndexId, String> indexLookup = new HashMap<>(repositoryData.indexMetaDataGenerations().lookup.get(otherSnapshotId));
|
||||||
|
indexLookup.putAll(newIndices);
|
||||||
|
final SnapshotId newSnapshot = new SnapshotId(randomAlphaOfLength(7), UUIDs.randomBase64UUID(random()));
|
||||||
|
|
||||||
|
RepositoryData newRepoData =
|
||||||
|
repositoryData.addSnapshot(newSnapshot, SnapshotState.SUCCESS, Version.CURRENT, shardGenerations, indexLookup, newIdentifiers);
|
||||||
|
assertEquals(newRepoData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(newSnapshot)),
|
||||||
|
newIndices.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
|
||||||
|
e -> Collections.singleton(newIdentifiers.get(e.getValue())))));
|
||||||
|
assertEquals(newRepoData.indexMetaDataToRemoveAfterRemovingSnapshots(Collections.singleton(otherSnapshotId)), removeFromOther);
|
||||||
|
}
|
||||||
|
|
||||||
public static RepositoryData generateRandomRepoData() {
|
public static RepositoryData generateRandomRepoData() {
|
||||||
final int numIndices = randomIntBetween(1, 30);
|
final int numIndices = randomIntBetween(1, 30);
|
||||||
final List<IndexId> indices = new ArrayList<>(numIndices);
|
final List<IndexId> indices = new ArrayList<>(numIndices);
|
||||||
|
@ -288,8 +349,14 @@ public class RepositoryDataTests extends ESTestCase {
|
||||||
builder.put(someIndex, j, uuid);
|
builder.put(someIndex, j, uuid);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
repositoryData = repositoryData.addSnapshot(snapshotId, randomFrom(SnapshotState.values()),
|
final Map<IndexId, String> indexLookup =
|
||||||
randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()), builder.build());
|
someIndices.stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256)));
|
||||||
|
repositoryData = repositoryData.addSnapshot(
|
||||||
|
snapshotId, randomFrom(SnapshotState.values()),
|
||||||
|
randomFrom(Version.CURRENT, Version.CURRENT.minimumCompatibilityVersion()),
|
||||||
|
builder.build(),
|
||||||
|
indexLookup,
|
||||||
|
indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random()))));
|
||||||
}
|
}
|
||||||
return repositoryData;
|
return repositoryData;
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,7 +200,7 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
||||||
RepositoryData repositoryData = generateRandomRepoData();
|
RepositoryData repositoryData = generateRandomRepoData();
|
||||||
final long startingGeneration = repositoryData.getGenId();
|
final long startingGeneration = repositoryData.getGenId();
|
||||||
final PlainActionFuture<RepositoryData> future1 = PlainActionFuture.newFuture();
|
final PlainActionFuture<RepositoryData> future1 = PlainActionFuture.newFuture();
|
||||||
repository.writeIndexGen(repositoryData, startingGeneration, true, Function.identity(),future1);
|
repository.writeIndexGen(repositoryData, startingGeneration, Version.CURRENT, Function.identity(),future1);
|
||||||
|
|
||||||
// write repo data again to index generational file, errors because we already wrote to the
|
// write repo data again to index generational file, errors because we already wrote to the
|
||||||
// N+1 generation from which this repository data instance was created
|
// N+1 generation from which this repository data instance was created
|
||||||
|
@ -241,8 +241,8 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception {
|
private static void writeIndexGen(BlobStoreRepository repository, RepositoryData repositoryData, long generation) throws Exception {
|
||||||
PlainActionFuture.<RepositoryData, Exception>get(f -> repository.writeIndexGen(repositoryData, generation, true,
|
PlainActionFuture.<RepositoryData, Exception>get(
|
||||||
Function.identity(), f));
|
f -> repository.writeIndexGen(repositoryData, generation, Version.CURRENT, Function.identity(), f));
|
||||||
}
|
}
|
||||||
|
|
||||||
private BlobStoreRepository setupRepo() {
|
private BlobStoreRepository setupRepo() {
|
||||||
|
@ -273,8 +273,13 @@ public class BlobStoreRepositoryTests extends ESSingleNodeTestCase {
|
||||||
for (int j = 0; j < numIndices; j++) {
|
for (int j = 0; j < numIndices; j++) {
|
||||||
builder.put(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()), 0, "1");
|
builder.put(new IndexId(randomAlphaOfLength(8), UUIDs.randomBase64UUID()), 0, "1");
|
||||||
}
|
}
|
||||||
|
final ShardGenerations shardGenerations = builder.build();
|
||||||
|
final Map<IndexId, String> indexLookup =
|
||||||
|
shardGenerations.indices().stream().collect(Collectors.toMap(Function.identity(), ind -> randomAlphaOfLength(256)));
|
||||||
repoData = repoData.addSnapshot(snapshotId,
|
repoData = repoData.addSnapshot(snapshotId,
|
||||||
randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), Version.CURRENT, builder.build());
|
randomFrom(SnapshotState.SUCCESS, SnapshotState.PARTIAL, SnapshotState.FAILED), Version.CURRENT, shardGenerations,
|
||||||
|
indexLookup,
|
||||||
|
indexLookup.values().stream().collect(Collectors.toMap(Function.identity(), ignored -> UUIDs.randomBase64UUID(random()))));
|
||||||
}
|
}
|
||||||
return repoData;
|
return repoData;
|
||||||
}
|
}
|
||||||
|
|
|
@ -778,7 +778,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
||||||
assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices)));
|
assertThat(snapshotInfo.successfulShards(), either(is(indices + 1)).or(is(indices)));
|
||||||
if (snapshotInfo.successfulShards() == indices + 1) {
|
if (snapshotInfo.successfulShards() == indices + 1) {
|
||||||
final IndexMetadata indexMetadata =
|
final IndexMetadata indexMetadata =
|
||||||
repository.getSnapshotIndexMetadata(snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index));
|
repository.getSnapshotIndexMetaData(repositoryData, snapshotInfo.snapshotId(), repositoryData.resolveIndexId(index));
|
||||||
// Make sure we snapshotted the metadata of this index and not the recreated version
|
// Make sure we snapshotted the metadata of this index and not the recreated version
|
||||||
assertEquals(indexMetadata.getIndex(), firstIndex.get());
|
assertEquals(indexMetadata.getIndex(), firstIndex.get());
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.index.mapper.MapperService;
|
||||||
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
import org.elasticsearch.index.snapshots.IndexShardSnapshotStatus;
|
||||||
import org.elasticsearch.index.store.Store;
|
import org.elasticsearch.index.store.Store;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
|
import org.elasticsearch.repositories.IndexMetaDataGenerations;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
import org.elasticsearch.repositories.RepositoryData;
|
import org.elasticsearch.repositories.RepositoryData;
|
||||||
import org.elasticsearch.repositories.ShardGenerations;
|
import org.elasticsearch.repositories.ShardGenerations;
|
||||||
|
@ -40,7 +41,6 @@ import org.elasticsearch.snapshots.SnapshotId;
|
||||||
import org.elasticsearch.snapshots.SnapshotInfo;
|
import org.elasticsearch.snapshots.SnapshotInfo;
|
||||||
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
import org.elasticsearch.snapshots.SnapshotShardFailure;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -87,7 +87,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException {
|
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -95,7 +95,7 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
|
||||||
public void getRepositoryData(ActionListener<RepositoryData> listener) {
|
public void getRepositoryData(ActionListener<RepositoryData> listener) {
|
||||||
final IndexId indexId = new IndexId(indexName, "blah");
|
final IndexId indexId = new IndexId(indexName, "blah");
|
||||||
listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
|
listener.onResponse(new RepositoryData(EMPTY_REPO_GEN, Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(),
|
||||||
Collections.singletonMap(indexId, emptyList()), ShardGenerations.EMPTY));
|
Collections.singletonMap(indexId, emptyList()), ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -61,6 +61,7 @@ import java.nio.file.NoSuchFileException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -122,7 +123,7 @@ public final class BlobStoreTestUtil {
|
||||||
LoggingDeprecationHandler.INSTANCE, blob)) {
|
LoggingDeprecationHandler.INSTANCE, blob)) {
|
||||||
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen, false);
|
repositoryData = RepositoryData.snapshotsFromXContent(parser, latestGen, false);
|
||||||
}
|
}
|
||||||
assertIndexUUIDs(blobContainer, repositoryData);
|
assertIndexUUIDs(repository, repositoryData);
|
||||||
assertSnapshotUUIDs(repository, repositoryData);
|
assertSnapshotUUIDs(repository, repositoryData);
|
||||||
assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
|
assertShardIndexGenerations(blobContainer, repositoryData.shardGenerations());
|
||||||
return null;
|
return null;
|
||||||
|
@ -165,10 +166,10 @@ public final class BlobStoreTestUtil {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertIndexUUIDs(BlobContainer repoRoot, RepositoryData repositoryData) throws IOException {
|
private static void assertIndexUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException {
|
||||||
final List<String> expectedIndexUUIDs =
|
final List<String> expectedIndexUUIDs =
|
||||||
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList());
|
repositoryData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toList());
|
||||||
final BlobContainer indicesContainer = repoRoot.children().get("indices");
|
final BlobContainer indicesContainer = repository.blobContainer().children().get("indices");
|
||||||
final List<String> foundIndexUUIDs;
|
final List<String> foundIndexUUIDs;
|
||||||
if (indicesContainer == null) {
|
if (indicesContainer == null) {
|
||||||
foundIndexUUIDs = Collections.emptyList();
|
foundIndexUUIDs = Collections.emptyList();
|
||||||
|
@ -178,6 +179,21 @@ public final class BlobStoreTestUtil {
|
||||||
s -> s.startsWith("extra") == false).collect(Collectors.toList());
|
s -> s.startsWith("extra") == false).collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY)));
|
assertThat(foundIndexUUIDs, containsInAnyOrder(expectedIndexUUIDs.toArray(Strings.EMPTY_ARRAY)));
|
||||||
|
for (String indexId : foundIndexUUIDs) {
|
||||||
|
final Set<String> indexMetaGenerationsFound = indicesContainer.children().get(indexId)
|
||||||
|
.listBlobsByPrefix(BlobStoreRepository.METADATA_PREFIX).keySet().stream()
|
||||||
|
.map(p -> p.replace(BlobStoreRepository.METADATA_PREFIX, "").replace(".dat", ""))
|
||||||
|
.collect(Collectors.toSet());
|
||||||
|
final Set<String> indexMetaGenerationsExpected = new HashSet<>();
|
||||||
|
final IndexId idx =
|
||||||
|
repositoryData.getIndices().values().stream().filter(i -> i.getId().equals(indexId)).findFirst().get();
|
||||||
|
for (SnapshotId snapshotId : repositoryData.getSnapshots(idx)) {
|
||||||
|
indexMetaGenerationsExpected.add(repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, idx));
|
||||||
|
}
|
||||||
|
// TODO: assertEquals(indexMetaGenerationsExpected, indexMetaGenerationsFound); requires cleanup functionality for
|
||||||
|
// index meta generations blobs
|
||||||
|
assertTrue(indexMetaGenerationsFound.containsAll(indexMetaGenerationsExpected));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertSnapshotUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException {
|
private static void assertSnapshotUUIDs(BlobStoreRepository repository, RepositoryData repositoryData) throws IOException {
|
||||||
|
@ -208,8 +224,9 @@ public final class BlobStoreTestUtil {
|
||||||
assertThat(indices, hasKey(indexId.getId()));
|
assertThat(indices, hasKey(indexId.getId()));
|
||||||
final BlobContainer indexContainer = indices.get(indexId.getId());
|
final BlobContainer indexContainer = indices.get(indexId.getId());
|
||||||
assertThat(indexContainer.listBlobs(),
|
assertThat(indexContainer.listBlobs(),
|
||||||
hasKey(String.format(Locale.ROOT, BlobStoreRepository.METADATA_NAME_FORMAT, snapshotId.getUUID())));
|
hasKey(String.format(Locale.ROOT, BlobStoreRepository.METADATA_NAME_FORMAT,
|
||||||
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetadata(snapshotId, indexId);
|
repositoryData.indexMetaDataGenerations().indexMetaBlobId(snapshotId, indexId))));
|
||||||
|
final IndexMetadata indexMetadata = repository.getSnapshotIndexMetaData(repositoryData, snapshotId, indexId);
|
||||||
for (Map.Entry<String, BlobContainer> entry : indexContainer.children().entrySet()) {
|
for (Map.Entry<String, BlobContainer> entry : indexContainer.children().entrySet()) {
|
||||||
// Skip Lucene MockFS extraN directory
|
// Skip Lucene MockFS extraN directory
|
||||||
if (entry.getKey().startsWith("extra")) {
|
if (entry.getKey().startsWith("extra")) {
|
||||||
|
|
|
@ -53,7 +53,6 @@ import org.elasticsearch.snapshots.mockstore.MockRepository;
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
import org.elasticsearch.test.ESIntegTestCase;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.test.VersionUtils;
|
import org.elasticsearch.test.VersionUtils;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -325,8 +324,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
||||||
logger.info("--> writing downgraded RepositoryData for repository metadata version [{}]", version);
|
logger.info("--> writing downgraded RepositoryData for repository metadata version [{}]", version);
|
||||||
final RepositoryData repositoryData = getRepositoryData(repoName);
|
final RepositoryData repositoryData = getRepositoryData(repoName);
|
||||||
final XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
|
final XContentBuilder jsonBuilder = JsonXContent.contentBuilder();
|
||||||
final boolean writeShardGens = version.onOrAfter(SnapshotsService.SHARD_GEN_IN_REPO_DATA_VERSION);
|
repositoryData.snapshotsToXContent(jsonBuilder, version);
|
||||||
repositoryData.snapshotsToXContent(jsonBuilder, writeShardGens);
|
|
||||||
final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser(
|
final RepositoryData downgradedRepoData = RepositoryData.snapshotsFromXContent(JsonXContent.jsonXContent.createParser(
|
||||||
NamedXContentRegistry.EMPTY,
|
NamedXContentRegistry.EMPTY,
|
||||||
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
|
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
|
||||||
|
@ -334,7 +332,7 @@ public abstract class AbstractSnapshotIntegTestCase extends ESIntegTestCase {
|
||||||
repositoryData.getGenId(), randomBoolean());
|
repositoryData.getGenId(), randomBoolean());
|
||||||
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()),
|
Files.write(repoPath.resolve(BlobStoreRepository.INDEX_FILE_PREFIX + repositoryData.getGenId()),
|
||||||
BytesReference.toBytes(BytesReference.bytes(
|
BytesReference.toBytes(BytesReference.bytes(
|
||||||
downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), writeShardGens))),
|
downgradedRepoData.snapshotsToXContent(XContentFactory.jsonBuilder(), version))),
|
||||||
StandardOpenOption.TRUNCATE_EXISTING);
|
StandardOpenOption.TRUNCATE_EXISTING);
|
||||||
return oldVersionSnapshot;
|
return oldVersionSnapshot;
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.elasticsearch.indices.recovery.MultiChunkTransfer;
|
||||||
import org.elasticsearch.indices.recovery.MultiFileWriter;
|
import org.elasticsearch.indices.recovery.MultiFileWriter;
|
||||||
import org.elasticsearch.indices.recovery.RecoveryState;
|
import org.elasticsearch.indices.recovery.RecoveryState;
|
||||||
import org.elasticsearch.repositories.IndexId;
|
import org.elasticsearch.repositories.IndexId;
|
||||||
|
import org.elasticsearch.repositories.IndexMetaDataGenerations;
|
||||||
import org.elasticsearch.repositories.Repository;
|
import org.elasticsearch.repositories.Repository;
|
||||||
import org.elasticsearch.repositories.RepositoryData;
|
import org.elasticsearch.repositories.RepositoryData;
|
||||||
import org.elasticsearch.repositories.ShardGenerations;
|
import org.elasticsearch.repositories.ShardGenerations;
|
||||||
|
@ -193,7 +194,7 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public IndexMetadata getSnapshotIndexMetadata(SnapshotId snapshotId, IndexId index) throws IOException {
|
public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, SnapshotId snapshotId, IndexId index) {
|
||||||
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
|
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
|
||||||
String leaderIndex = index.getName();
|
String leaderIndex = index.getName();
|
||||||
Client remoteClient = getRemoteClusterClient();
|
Client remoteClient = getRemoteClusterClient();
|
||||||
|
@ -256,7 +257,8 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
|
||||||
Index index = remoteIndices.get(indexName).getIndex();
|
Index index = remoteIndices.get(indexName).getIndex();
|
||||||
indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singletonList(snapshotId));
|
indexSnapshots.put(new IndexId(indexName, index.getUUID()), Collections.singletonList(snapshotId));
|
||||||
}
|
}
|
||||||
return new RepositoryData(1, copiedSnapshotIds, snapshotStates, snapshotVersions, indexSnapshots, ShardGenerations.EMPTY);
|
return new RepositoryData(1, copiedSnapshotIds, snapshotStates, snapshotVersions, indexSnapshots,
|
||||||
|
ShardGenerations.EMPTY, IndexMetaDataGenerations.EMPTY);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -237,7 +237,8 @@ public class SourceOnlySnapshotShardTests extends IndexShardTestCase {
|
||||||
ShardRoutingState.INITIALIZING,
|
ShardRoutingState.INITIALIZING,
|
||||||
new RecoverySource.SnapshotRecoverySource(
|
new RecoverySource.SnapshotRecoverySource(
|
||||||
UUIDs.randomBase64UUID(), new Snapshot("src_only", snapshotId), Version.CURRENT, indexId));
|
UUIDs.randomBase64UUID(), new Snapshot("src_only", snapshotId), Version.CURRENT, indexId));
|
||||||
IndexMetadata metadata = runAsSnapshot(threadPool, () -> repository.getSnapshotIndexMetadata(snapshotId, indexId));
|
IndexMetadata metadata = runAsSnapshot(threadPool, () ->
|
||||||
|
repository.getSnapshotIndexMetaData(PlainActionFuture.get(repository::getRepositoryData), snapshotId, indexId));
|
||||||
IndexShard restoredShard = newShard(
|
IndexShard restoredShard = newShard(
|
||||||
shardRouting, metadata, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {}, RetentionLeaseSyncer.EMPTY);
|
shardRouting, metadata, null, SourceOnlySnapshotRepository.getEngineFactory(), () -> {}, RetentionLeaseSyncer.EMPTY);
|
||||||
DiscoveryNode discoveryNode = new DiscoveryNode("node_g", buildNewFakeTransportAddress(), Version.CURRENT);
|
DiscoveryNode discoveryNode = new DiscoveryNode("node_g", buildNewFakeTransportAddress(), Version.CURRENT);
|
||||||
|
|
Loading…
Reference in New Issue