Implement Shard Snapshot Clone Logic (#62771) (#63260)

First part of the snapshot clone logic that implements the snapshot clone functionality on
the repository level.
This commit is contained in:
Armin Braun 2020-10-05 22:55:52 +02:00 committed by GitHub
parent d027e24b31
commit 860791260d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 378 additions and 0 deletions

View File

@ -0,0 +1,123 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.snapshots;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot;
import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshots;
import org.elasticsearch.index.snapshots.blobstore.SnapshotFiles;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.blobstore.BlobStoreRepository;
import org.elasticsearch.test.ESIntegTestCase;
import java.nio.file.Path;
import java.util.List;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 0)
public class CloneSnapshotIT extends AbstractSnapshotIntegTestCase {
public void testShardClone() throws Exception {
internalCluster().startMasterOnlyNode();
internalCluster().startDataOnlyNode();
final String repoName = "repo-name";
final Path repoPath = randomRepoPath();
createRepository(repoName, "fs", repoPath);
final boolean useBwCFormat = randomBoolean();
if (useBwCFormat) {
initWithSnapshotVersion(repoName, repoPath, SnapshotsService.OLD_SNAPSHOT_FORMAT);
// Re-create repo to clear repository data cache
assertAcked(client().admin().cluster().prepareDeleteRepository(repoName).get());
createRepository(repoName, "fs", repoPath);
}
final String indexName = "test-index";
createIndexWithRandomDocs(indexName, randomIntBetween(5, 10));
final String sourceSnapshot = "source-snapshot";
final SnapshotInfo sourceSnapshotInfo = createFullSnapshot(repoName, sourceSnapshot);
final BlobStoreRepository repository =
(BlobStoreRepository) internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class).repository(repoName);
final RepositoryData repositoryData = getRepositoryData(repoName);
final IndexId indexId = repositoryData.resolveIndexId(indexName);
final int shardId = 0;
final RepositoryShardId repositoryShardId = new RepositoryShardId(indexId, shardId);
final SnapshotId targetSnapshotId = new SnapshotId("target-snapshot", UUIDs.randomBase64UUID(random()));
final String currentShardGen;
if (useBwCFormat) {
currentShardGen = null;
} else {
currentShardGen = repositoryData.shardGenerations().getShardGen(indexId, shardId);
}
final String newShardGeneration = PlainActionFuture.get(f -> repository.cloneShardSnapshot(
sourceSnapshotInfo.snapshotId(), targetSnapshotId, repositoryShardId, currentShardGen, f));
if (useBwCFormat) {
final long gen = Long.parseLong(newShardGeneration);
assertEquals(gen, 1L); // Initial snapshot brought it to 0, clone increments it to 1
}
final BlobStoreIndexShardSnapshot targetShardSnapshot = readShardSnapshot(repository, repositoryShardId, targetSnapshotId);
final BlobStoreIndexShardSnapshot sourceShardSnapshot =
readShardSnapshot(repository, repositoryShardId, sourceSnapshotInfo.snapshotId());
assertThat(targetShardSnapshot.incrementalFileCount(), is(0));
final List<BlobStoreIndexShardSnapshot.FileInfo> sourceFiles = sourceShardSnapshot.indexFiles();
final List<BlobStoreIndexShardSnapshot.FileInfo> targetFiles = targetShardSnapshot.indexFiles();
final int fileCount = sourceFiles.size();
assertEquals(fileCount, targetFiles.size());
for (int i = 0; i < fileCount; i++) {
assertTrue(sourceFiles.get(i).isSame(targetFiles.get(i)));
}
final BlobStoreIndexShardSnapshots shardMetadata = readShardGeneration(repository, repositoryShardId, newShardGeneration);
final List<SnapshotFiles> snapshotFiles = shardMetadata.snapshots();
assertThat(snapshotFiles, hasSize(2));
assertTrue(snapshotFiles.get(0).isSame(snapshotFiles.get(1)));
// verify that repeated cloning is idempotent
final String newShardGeneration2 = PlainActionFuture.get(f -> repository.cloneShardSnapshot(
sourceSnapshotInfo.snapshotId(), targetSnapshotId, repositoryShardId, newShardGeneration, f));
assertEquals(newShardGeneration, newShardGeneration2);
}
private static BlobStoreIndexShardSnapshots readShardGeneration(BlobStoreRepository repository, RepositoryShardId repositoryShardId,
String generation) {
return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f,
() -> BlobStoreRepository.INDEX_SHARD_SNAPSHOTS_FORMAT.read(repository.shardContainer(repositoryShardId.index(),
repositoryShardId.shardId()), generation, NamedXContentRegistry.EMPTY))));
}
private static BlobStoreIndexShardSnapshot readShardSnapshot(BlobStoreRepository repository, RepositoryShardId repositoryShardId,
SnapshotId snapshotId) {
return PlainActionFuture.get(f -> repository.threadPool().generic().execute(ActionRunnable.supply(f,
() -> repository.loadShardSnapshot(repository.shardContainer(repositoryShardId.index(), repositoryShardId.shardId()),
snapshotId))));
}
}

View File

@ -388,6 +388,18 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
this.incrementalSize = incrementalSize;
}
/**
* Creates a new instance has a different name and zero incremental file counts but is identical to this instance in terms of the files
* it references.
*
* @param targetSnapshotName target snapshot name
* @param startTime time the clone operation on the repository was started
* @param time time it took to create the clone
*/
public BlobStoreIndexShardSnapshot asClone(String targetSnapshotName, long startTime, long time) {
return new BlobStoreIndexShardSnapshot(targetSnapshotName, indexVersion, indexFiles, startTime, time, 0, 0);
}
/**
* Returns snapshot name
*

View File

@ -105,6 +105,30 @@ public class BlobStoreIndexShardSnapshots implements Iterable<SnapshotFiles>, To
this.physicalFiles = unmodifiableMap(mapBuilder);
}
/**
* Create a new instance that has a new snapshot by name {@code target} added which shares all files with the snapshot of name
* {@code source}.
*
* @param source source snapshot name
* @param target target snapshot name
* @return new instance with added cloned snapshot
*/
public BlobStoreIndexShardSnapshots withClone(String source, String target) {
SnapshotFiles sourceFiles = null;
for (SnapshotFiles shardSnapshot : shardSnapshots) {
if (shardSnapshot.snapshot().equals(source)) {
sourceFiles = shardSnapshot;
break;
}
}
if (sourceFiles == null) {
throw new IllegalArgumentException("unknown source [" + source + "]");
}
final List<SnapshotFiles> updated = new ArrayList<>(shardSnapshots);
updated.add(sourceFiles.withSnapshotName(target));
return new BlobStoreIndexShardSnapshots(updated);
}
/**
* Returns list of snapshots
*

View File

@ -24,6 +24,7 @@ import org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.F
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Contains a list of files participating in a snapshot
@ -59,6 +60,32 @@ public class SnapshotFiles {
this.shardStateIdentifier = shardStateIdentifier;
}
/**
* Creates a new instance with the given snapshot name but otherwise identical to the current instance.
*/
public SnapshotFiles withSnapshotName(String snapshotName) {
return new SnapshotFiles(snapshotName, indexFiles, shardStateIdentifier);
}
/**
* Checks if the given other instance contains the same files as well as the same {@link #shardStateIdentifier}.
*/
public boolean isSame(SnapshotFiles other) {
if (Objects.equals(shardStateIdentifier, other.shardStateIdentifier) == false) {
return false;
}
final int fileCount = indexFiles.size();
if (other.indexFiles.size() != fileCount) {
return false;
}
for (int i = 0; i < fileCount; i++) {
if (indexFiles.get(i).isSame(other.indexFiles.get(i)) == false) {
return false;
}
}
return true;
}
/**
* Returns an identifier for the shard state that can be used to check whether a shard has changed between
* snapshots or not.

View File

@ -155,6 +155,12 @@ public class FilterRepository implements Repository {
in.executeConsistentStateUpdate(createUpdateTask, source, onFailure);
}
@Override
public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryShardId shardId, String shardGeneration,
ActionListener<String> listener) {
in.cloneShardSnapshot(source, target, shardId, shardGeneration, listener);
}
@Override
public Lifecycle.State lifecycleState() {
return in.lifecycleState();

View File

@ -276,6 +276,18 @@ public interface Repository extends LifecycleComponent {
void executeConsistentStateUpdate(Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask, String source,
Consumer<Exception> onFailure);
/**
* Clones a shard snapshot.
*
* @param source source snapshot
* @param target target snapshot
* @param shardId shard id
* @param shardGeneration shard generation in repo
* @param listener listener to complete with new shard generation once clone has completed
*/
void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryShardId shardId, @Nullable String shardGeneration,
ActionListener<String> listener);
/**
* Hook that allows a repository to filter the user supplied snapshot metadata in {@link SnapshotsInProgress.Entry#userMetadata()}
* during snapshot initialization.

View File

@ -0,0 +1,87 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.repositories;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import java.io.IOException;
import java.util.Objects;
/**
* Represents a shard snapshot in a repository.
*/
public final class RepositoryShardId implements Writeable {
private final IndexId index;
private final int shard;
public RepositoryShardId(IndexId index, int shard) {
assert index != null;
this.index = index;
this.shard = shard;
}
public RepositoryShardId(StreamInput in) throws IOException {
this(new IndexId(in), in.readVInt());
}
public IndexId index() {
return index;
}
public String indexName() {
return index.getName();
}
public int shardId() {
return shard;
}
@Override
public int hashCode() {
return Objects.hash(index, shard);
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj instanceof RepositoryShardId == false) {
return false;
}
final RepositoryShardId that = (RepositoryShardId) obj;
return that.index.equals(index) && that.shard == shard;
}
@Override
public void writeTo(StreamOutput out) throws IOException {
index.writeTo(out);
out.writeVInt(shard);
}
@Override
public String toString() {
return "RepositoryShardId{" + index + "}{" + shard + "}";
}
}

View File

@ -105,6 +105,7 @@ import org.elasticsearch.repositories.RepositoryCleanupResult;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryException;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.RepositoryStats;
import org.elasticsearch.repositories.RepositoryVerificationException;
import org.elasticsearch.snapshots.SnapshotCreationException;
@ -398,6 +399,68 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}, onFailure));
}
@Override
public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryShardId shardId,
@Nullable String shardGeneration, ActionListener<String> listener) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot clone shard snapshot on a readonly repository"));
return;
}
final IndexId index = shardId.index();
final int shardNum = shardId.shardId();
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
executor.execute(ActionRunnable.supply(listener, () -> {
final long startTime = threadPool.absoluteTimeInMillis();
final BlobContainer shardContainer = shardContainer(index, shardNum);
final BlobStoreIndexShardSnapshots existingSnapshots;
final String newGen;
final String existingShardGen;
if (shardGeneration == null) {
Tuple<BlobStoreIndexShardSnapshots, Long> tuple = buildBlobStoreIndexShardSnapshots(
shardContainer.listBlobsByPrefix(INDEX_FILE_PREFIX).keySet(), shardContainer);
existingShardGen = String.valueOf(tuple.v2());
newGen = String.valueOf(tuple.v2() + 1);
existingSnapshots = tuple.v1();
} else {
newGen = UUIDs.randomBase64UUID();
existingSnapshots = buildBlobStoreIndexShardSnapshots(Collections.emptySet(), shardContainer, shardGeneration).v1();
existingShardGen = shardGeneration;
}
SnapshotFiles existingTargetFiles = null;
SnapshotFiles sourceFiles = null;
for (SnapshotFiles existingSnapshot : existingSnapshots) {
final String snapshotName = existingSnapshot.snapshot();
if (snapshotName.equals(target.getName())) {
existingTargetFiles = existingSnapshot;
} else if (snapshotName.equals(source.getName())) {
sourceFiles = existingSnapshot;
}
if (sourceFiles != null && existingTargetFiles != null) {
break;
}
}
if (sourceFiles == null) {
throw new RepositoryException(metadata.name(), "Can't create clone of [" + shardId + "] for snapshot ["
+ target + "]. The source snapshot [" + source + "] was not found in the shard metadata.");
}
if (existingTargetFiles != null) {
if (existingTargetFiles.isSame(sourceFiles)) {
return existingShardGen;
}
throw new RepositoryException(metadata.name(), "Can't create clone of [" + shardId + "] for snapshot ["
+ target + "]. A snapshot by that name already exists for this shard.");
}
final BlobStoreIndexShardSnapshot sourceMeta = loadShardSnapshot(shardContainer, source);
logger.trace("[{}] [{}] writing shard snapshot file for clone", shardId, target);
INDEX_SHARD_SNAPSHOT_FORMAT.write(sourceMeta.asClone(target.getName(), startTime,
threadPool.absoluteTimeInMillis() - startTime),
shardContainer, target.getUUID(), compress);
INDEX_SHARD_SNAPSHOTS_FORMAT.write(existingSnapshots.withClone(source.getName(), target.getName()), shardContainer, newGen,
compress);
return newGen;
}));
}
// Inspects all cluster state elements that contain a hint about what the current repository generation is and updates
// #latestKnownRepoGen if a newer than currently known generation is found
@Override
@ -1806,6 +1869,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, String shardStateIdentifier, IndexShardSnapshotStatus snapshotStatus,
Version repositoryMetaVersion, Map<String, Object> userMetadata, ActionListener<String> listener) {
if (isReadOnly()) {
listener.onFailure(new RepositoryException(metadata.name(), "cannot snapshot shard on a readonly repository"));
return;
}
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
try {

View File

@ -292,6 +292,12 @@ public class RepositoriesServiceTests extends ESTestCase {
Consumer<Exception> onFailure) {
}
@Override
public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryShardId shardId, String shardGeneration,
ActionListener<String> listener) {
}
@Override
public Lifecycle.State lifecycleState() {
return null;

View File

@ -35,6 +35,7 @@ import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.IndexMetaDataGenerations;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.ShardGenerations;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfo;
@ -162,4 +163,10 @@ public abstract class RestoreOnlyRepository extends AbstractLifecycleComponent i
Consumer<Exception> onFailure) {
throw new UnsupportedOperationException("Unsupported for restore-only repository");
}
@Override
public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryShardId repositoryShardId, String shardGeneration,
ActionListener<String> listener) {
throw new UnsupportedOperationException("Unsupported for restore-only repository");
}
}

View File

@ -61,6 +61,7 @@ import org.elasticsearch.indices.recovery.MultiFileWriter;
import org.elasticsearch.indices.recovery.RecoveryState;
import org.elasticsearch.repositories.IndexId;
import org.elasticsearch.repositories.IndexMetaDataGenerations;
import org.elasticsearch.repositories.RepositoryShardId;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.ShardGenerations;
@ -451,6 +452,12 @@ public class CcrRepository extends AbstractLifecycleComponent implements Reposit
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}
@Override
public void cloneShardSnapshot(SnapshotId source, SnapshotId target, RepositoryShardId shardId, String shardGeneration,
ActionListener<String> listener) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}
private void updateMappings(Client leaderClient, Index leaderIndex, long leaderMappingVersion,
Client followerClient, Index followerIndex) {
final PlainActionFuture<IndexMetadata> indexMetadataFuture = new PlainActionFuture<>();