Simplify BlobStoreRepository (Flatten Nested Classes) (#42833) (#44060)

* In the current codebase it is hardly obvious what code operates on a shard and is run by a datanode what code operates on the global metadata and is run on master
   * Fixed by adjusting the method names accordingly
* The nested context classes don't add much if any value, they simply spread out the parameters that go into a shard snapshot create or delete all over the place since their
constructors can be inlined in all spots
   * Fixed by flattening the nested classes into BlobStoreRepository
* Also:
  * Inlined the other single use inner classes
This commit is contained in:
Armin Braun 2019-07-08 14:57:27 +02:00 committed by GitHub
parent afe81fd625
commit 2918363e90
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 349 additions and 453 deletions

View File

@ -502,8 +502,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
if (indexMetaData != null) {
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
try {
final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId);
new Context(snapshotId, indexId, sid, sid).delete();
deleteShardSnapshot(indexId, new ShardId(indexMetaData.getIndex(), shardId), snapshotId);
} catch (SnapshotException ex) {
final int finalShardId = shardId;
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
@ -526,9 +525,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
/**
* {@inheritDoc}
*/
@Override
public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId,
final List<IndexId> indices,
@ -541,7 +537,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final Map<String, Object> userMetadata) {
SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId,
indices.stream().map(IndexId::getName).collect(Collectors.toList()),
startTime, failure, System.currentTimeMillis(), totalShards, shardFailures,
startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures,
includeGlobalState, userMetadata);
try {
final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices);
@ -845,11 +841,130 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId,
IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) {
SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis());
final ShardId shardId = store.shardId();
final long startTime = threadPool.absoluteTimeInMillis();
try {
snapshotContext.snapshot(snapshotIndexCommit);
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name());
final BlobContainer shardContainer = shardContainer(indexId, shardId);
final Map<String, BlobMetaData> blobs;
try {
blobs = shardContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
}
Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
if (snapshots.snapshots().stream().anyMatch(sf -> sf.snapshot().equals(snapshotId.getName()))) {
throw new IndexShardSnapshotFailedException(shardId,
"Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting");
}
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
store.incRef();
try {
ArrayList<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new ArrayList<>();
final Store.MetadataSnapshot metadata;
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
final Collection<String> fileNames;
try {
logger.trace(
"[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadata = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
int indexIncrementalFileCount = 0;
int indexTotalNumberOfFiles = 0;
long indexIncrementalSize = 0;
long indexTotalFileCount = 0;
for (String fileName : fileNames) {
if (snapshotStatus.isAborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadata.get(fileName);
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
}
}
}
indexTotalFileCount += md.length();
indexTotalNumberOfFiles++;
if (existingFileInfo == null) {
indexIncrementalFileCount++;
indexIncrementalSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
new BlobStoreIndexShardSnapshot.FileInfo(DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(), md, chunkSize());
indexCommitPointFiles.add(snapshotFileInfo);
filesToSnapshot.add(snapshotFileInfo);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
}
snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);
for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) {
try {
snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e);
}
}
} finally {
store.decRef();
}
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());
// now create and write the commit point
final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(),
lastSnapshotStatus.getIndexVersion(),
indexCommitPointFiles,
lastSnapshotStatus.getStartTime(),
threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(),
lastSnapshotStatus.getIncrementalFileCount(),
lastSnapshotStatus.getIncrementalSize()
);
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID());
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
// delete all files that are not referenced by any commit point
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
for (SnapshotFiles point : snapshots) {
newSnapshotsList.add(point);
}
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot creation [" + snapshotId + "]", shardContainer,
shardId, snapshotId);
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis());
} catch (Exception e) {
snapshotStatus.moveToFailed(System.currentTimeMillis(), ExceptionsHelper.detailedMessage(e));
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e));
if (e instanceof IndexShardSnapshotFailedException) {
throw (IndexShardSnapshotFailedException) e;
} else {
@ -859,16 +974,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
@Override
public void restoreShard(Store store, SnapshotId snapshotId,
Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) {
public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId,
RecoveryState recoveryState) {
ShardId shardId = store.shardId();
final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId);
final RestoreContext snapshotContext =
new RestoreContext(shardId, snapshotId, recoveryState, shardContainer(indexId, snapshotShardId));
try {
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
final BlobContainer container = shardContainer(indexId, snapshotShardId);
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId);
SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles());
snapshotContext.restore(snapshotFiles, store);
new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) {
@Override
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) {
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.readBlob(fileInfo.partName(slice));
}
};
return restoreRateLimiter == null ? dataBlobCompositeStream
: new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc);
}
}.restore(snapshotFiles, store);
} catch (Exception e) {
throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e);
}
@ -876,8 +1001,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) {
Context context = new Context(snapshotId, indexId, shardId);
BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot();
BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId);
return IndexShardSnapshotStatus.newDone(snapshot.startTime(), snapshot.time(),
snapshot.incrementalFileCount(), snapshot.totalFileCount(),
snapshot.incrementalSize(), snapshot.totalSize());
@ -923,458 +1047,230 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
/**
* Context for snapshot/restore operations
* Delete shard snapshot
*/
private class Context {
protected final SnapshotId snapshotId;
protected final ShardId shardId;
protected final BlobContainer blobContainer;
Context(SnapshotId snapshotId, IndexId indexId, ShardId shardId) {
this(snapshotId, indexId, shardId, shardId);
private void deleteShardSnapshot(IndexId indexId, ShardId snapshotShardId, SnapshotId snapshotId) {
final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId);
final Map<String, BlobMetaData> blobs;
try {
blobs = shardContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e);
}
Context(SnapshotId snapshotId, IndexId indexId, ShardId shardId, ShardId snapshotShardId) {
this.snapshotId = snapshotId;
this.shardId = shardId;
blobContainer = shardContainer(indexId, snapshotShardId);
Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
try {
indexShardSnapshotFormat.delete(shardContainer, snapshotId.getUUID());
} catch (IOException e) {
logger.warn(new ParameterizedMessage("[{}] [{}] failed to delete shard snapshot file", snapshotShardId, snapshotId), e);
}
/**
* Delete shard snapshot
*/
public void delete() {
final Map<String, BlobMetaData> blobs;
try {
blobs = blobContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e);
}
Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
try {
indexShardSnapshotFormat.delete(blobContainer, snapshotId.getUUID());
} catch (IOException e) {
logger.warn(new ParameterizedMessage("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId), e);
}
// Build a list of snapshots that should be preserved
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
for (SnapshotFiles point : snapshots) {
if (!point.snapshot().equals(snapshotId.getName())) {
newSnapshotsList.add(point);
}
}
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
finalize(newSnapshotsList, fileListGeneration + 1, blobs, "snapshot deletion [" + snapshotId + "]");
}
/**
* Loads information about shard snapshot
*/
BlobStoreIndexShardSnapshot loadSnapshot() {
try {
return indexShardSnapshotFormat.read(blobContainer, snapshotId.getUUID());
} catch (IOException ex) {
throw new SnapshotException(metadata.name(), snapshotId, "failed to read shard snapshot file for " + shardId, ex);
}
}
/**
* Writes a new index file for the shard and removes all unreferenced files from the repository.
*
* We need to be really careful in handling index files in case of failures to make sure we don't
* have index file that points to files that were deleted.
*
* @param snapshots list of active snapshots in the container
* @param fileListGeneration the generation number of the snapshot index file
* @param blobs list of blobs in the container
* @param reason a reason explaining why the shard index file is written
*/
protected void finalize(final List<SnapshotFiles> snapshots,
final int fileListGeneration,
final Map<String, BlobMetaData> blobs,
final String reason) {
final String indexGeneration = Integer.toString(fileListGeneration);
try {
final List<String> blobsToDelete;
if (snapshots.isEmpty()) {
// If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found
blobsToDelete = new ArrayList<>(blobs.keySet());
} else {
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, blobContainer, indexGeneration);
// Delete all previous index-N, data-blobs that are not referenced by the new index-N and temporary blobs
blobsToDelete = blobs.keySet().stream().filter(blob ->
blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|| blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
}
try {
blobContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization",
snapshotId, shardId), e);
}
} catch (IOException e) {
String message =
"Failed to finalize " + reason + " with shard index [" + indexShardSnapshotsFormat.blobName(indexGeneration) + "]";
throw new IndexShardSnapshotFailedException(shardId, message, e);
}
}
/**
* Loads all available snapshots in the repository
*
* @param blobs list of blobs in repository
* @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation
*/
protected Tuple<BlobStoreIndexShardSnapshots, Integer> buildBlobStoreIndexShardSnapshots(Map<String, BlobMetaData> blobs) {
int latest = -1;
Set<String> blobKeys = blobs.keySet();
for (String name : blobKeys) {
if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length()));
if (gen > latest) {
latest = gen;
}
} catch (NumberFormatException ex) {
logger.warn("failed to parse index file name [{}]", name);
}
}
}
if (latest >= 0) {
try {
final BlobStoreIndexShardSnapshots shardSnapshots =
indexShardSnapshotsFormat.read(blobContainer, Integer.toString(latest));
return new Tuple<>(shardSnapshots, latest);
} catch (IOException e) {
final String file = SNAPSHOT_INDEX_PREFIX + latest;
logger.warn(() -> new ParameterizedMessage("failed to read index file [{}]", file), e);
}
} else if (blobKeys.isEmpty() == false) {
logger.warn("Could not find a readable index-N file in a non-empty shard snapshot directory [{}]", blobContainer.path());
}
// We couldn't load the index file - falling back to loading individual snapshots
List<SnapshotFiles> snapshots = new ArrayList<>();
for (String name : blobKeys) {
try {
BlobStoreIndexShardSnapshot snapshot = null;
if (name.startsWith(SNAPSHOT_PREFIX)) {
snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name);
}
if (snapshot != null) {
snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("failed to read commit point [{}]", name), e);
}
}
return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), latest);
}
}
/**
* Context for snapshot operations
*/
private class SnapshotContext extends Context {
private final Store store;
private final IndexShardSnapshotStatus snapshotStatus;
private final long startTime;
/**
* Constructs new context
*
* @param store store to be snapshotted
* @param snapshotId snapshot id
* @param indexId the id of the index being snapshotted
* @param snapshotStatus snapshot status to report progress
*/
SnapshotContext(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) {
super(snapshotId, indexId, store.shardId());
this.snapshotStatus = snapshotStatus;
this.store = store;
this.startTime = startTime;
}
/**
* Create snapshot from index commit point
*
* @param snapshotIndexCommit snapshot commit point
*/
public void snapshot(final IndexCommit snapshotIndexCommit) {
logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name());
final Map<String, BlobMetaData> blobs;
try {
blobs = blobContainer.listBlobs();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e);
}
Tuple<BlobStoreIndexShardSnapshots, Integer> tuple = buildBlobStoreIndexShardSnapshots(blobs);
BlobStoreIndexShardSnapshots snapshots = tuple.v1();
int fileListGeneration = tuple.v2();
if (snapshots.snapshots().stream().anyMatch(sf -> sf.snapshot().equals(snapshotId.getName()))) {
throw new IndexShardSnapshotFailedException(shardId,
"Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting");
}
final List<BlobStoreIndexShardSnapshot.FileInfo> indexCommitPointFiles = new ArrayList<>();
store.incRef();
int indexIncrementalFileCount = 0;
int indexTotalNumberOfFiles = 0;
long indexIncrementalSize = 0;
long indexTotalFileCount = 0;
try {
ArrayList<BlobStoreIndexShardSnapshot.FileInfo> filesToSnapshot = new ArrayList<>();
final Store.MetadataSnapshot metadata;
// TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should
final Collection<String> fileNames;
try {
logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit);
metadata = store.getMetadata(snapshotIndexCommit);
fileNames = snapshotIndexCommit.getFileNames();
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e);
}
for (String fileName : fileNames) {
if (snapshotStatus.isAborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName);
final StoreFileMetaData md = metadata.get(fileName);
BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null;
List<BlobStoreIndexShardSnapshot.FileInfo> filesInfo = snapshots.findPhysicalIndexFiles(fileName);
if (filesInfo != null) {
for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) {
if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) {
// a commit point file with the same name, size and checksum was already copied to repository
// we will reuse it for this snapshot
existingFileInfo = fileInfo;
break;
}
}
}
indexTotalFileCount += md.length();
indexTotalNumberOfFiles++;
if (existingFileInfo == null) {
indexIncrementalFileCount++;
indexIncrementalSize += md.length();
// create a new FileInfo
BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo =
new BlobStoreIndexShardSnapshot.FileInfo(DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(), md, chunkSize());
indexCommitPointFiles.add(snapshotFileInfo);
filesToSnapshot.add(snapshotFileInfo);
} else {
indexCommitPointFiles.add(existingFileInfo);
}
}
snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount,
indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount);
for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) {
try {
snapshotFile(snapshotFileInfo);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e);
}
}
} finally {
store.decRef();
}
final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration());
// now create and write the commit point
final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(),
lastSnapshotStatus.getIndexVersion(),
indexCommitPointFiles,
lastSnapshotStatus.getStartTime(),
// snapshotStatus.startTime() is assigned on the same machine,
// so it's safe to use with VLong
System.currentTimeMillis() - lastSnapshotStatus.getStartTime(),
lastSnapshotStatus.getIncrementalFileCount(),
lastSnapshotStatus.getIncrementalSize()
);
//TODO: The time stored in snapshot doesn't include cleanup time.
logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId);
try {
indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID());
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e);
}
// delete all files that are not referenced by any commit point
// build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
for (SnapshotFiles point : snapshots) {
// Build a list of snapshots that should be preserved
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
for (SnapshotFiles point : snapshots) {
if (!point.snapshot().equals(snapshotId.getName())) {
newSnapshotsList.add(point);
}
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
finalize(newSnapshotsList, fileListGeneration + 1, blobs, "snapshot creation [" + snapshotId + "]");
snapshotStatus.moveToDone(System.currentTimeMillis());
}
/**
* Snapshot individual file
*
* @param fileInfo file to be snapshotted
*/
private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException {
final String file = fileInfo.physicalName();
try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) {
for (int i = 0; i < fileInfo.numberOfParts(); i++) {
final long partBytes = fileInfo.partBytes(i);
final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, partBytes);
InputStream inputStream = inputStreamIndexInput;
if (snapshotRateLimiter != null) {
inputStream = new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter,
snapshotRateLimitingTimeInNanos::inc);
}
inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName());
blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes, true);
}
Store.verify(indexInput);
snapshotStatus.addProcessedFile(fileInfo.length());
} catch (Exception t) {
failStoreIfCorrupted(t);
snapshotStatus.addProcessedFile(0);
throw t;
}
}
private void failStoreIfCorrupted(Exception e) {
if (Lucene.isCorruptionException(e)) {
try {
store.markStoreCorrupted((IOException) e);
} catch (IOException inner) {
inner.addSuppressed(e);
logger.warn("store cannot be marked as corrupted", inner);
}
}
}
/**
* Checks if snapshot file already exists in the list of blobs
*
* @param fileInfo file to check
* @param blobs list of blobs
* @return true if file exists in the list of blobs
*/
private boolean snapshotFileExistsInBlobs(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Map<String, BlobMetaData> blobs) {
BlobMetaData blobMetaData = blobs.get(fileInfo.name());
if (blobMetaData != null) {
return blobMetaData.length() == fileInfo.length();
} else if (blobs.containsKey(fileInfo.partName(0))) {
// multi part file sum up the size and check
int part = 0;
long totalSize = 0;
while (true) {
blobMetaData = blobs.get(fileInfo.partName(part++));
if (blobMetaData == null) {
break;
}
totalSize += blobMetaData.length();
}
return totalSize == fileInfo.length();
}
// no file, not exact and not multipart
return false;
}
private class AbortableInputStream extends FilterInputStream {
private final String fileName;
AbortableInputStream(InputStream delegate, String fileName) {
super(delegate);
this.fileName = fileName;
}
@Override
public int read() throws IOException {
checkAborted();
return in.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
checkAborted();
return in.read(b, off, len);
}
private void checkAborted() {
if (snapshotStatus.isAborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName);
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
}
}
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot deletion [" + snapshotId + "]", shardContainer,
snapshotShardId, snapshotId);
}
private static final class PartSliceStream extends SlicedInputStream {
private final BlobContainer container;
private final BlobStoreIndexShardSnapshot.FileInfo info;
PartSliceStream(BlobContainer container, BlobStoreIndexShardSnapshot.FileInfo info) {
super(info.numberOfParts());
this.info = info;
this.container = container;
}
@Override
protected InputStream openSlice(long slice) throws IOException {
return container.readBlob(info.partName(slice));
/**
* Loads information about shard snapshot
*/
private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) {
try {
return indexShardSnapshotFormat.read(shardContainer, snapshotId.getUUID());
} catch (IOException ex) {
throw new SnapshotException(metadata.name(), snapshotId,
"failed to read shard snapshot file for [" + shardContainer.path() + ']', ex);
}
}
/**
* Context for restore operations
* Writes a new index file for the shard and removes all unreferenced files from the repository.
*
* We need to be really careful in handling index files in case of failures to make sure we don't
* have index file that points to files that were deleted.
*
* @param snapshots list of active snapshots in the container
* @param fileListGeneration the generation number of the current snapshot index file
* @param blobs list of blobs in the container
* @param reason a reason explaining why the shard index file is written
*/
private class RestoreContext extends FileRestoreContext {
private void finalizeShard(List<SnapshotFiles> snapshots, int fileListGeneration, Map<String, BlobMetaData> blobs,
String reason, BlobContainer shardContainer, ShardId shardId, SnapshotId snapshotId) {
final String indexGeneration = Integer.toString(fileListGeneration + 1);
try {
final List<String> blobsToDelete;
if (snapshots.isEmpty()) {
// If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found
blobsToDelete = new ArrayList<>(blobs.keySet());
} else {
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
// Delete all previous index-N, data-blobs that are not referenced by the new index-N and temporary blobs
blobsToDelete = blobs.keySet().stream().filter(blob ->
blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|| blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
}
try {
shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization",
snapshotId, shardId), e);
}
} catch (IOException e) {
String message =
"Failed to finalize " + reason + " with shard index [" + indexShardSnapshotsFormat.blobName(indexGeneration) + "]";
throw new IndexShardSnapshotFailedException(shardId, message, e);
}
}
private final BlobContainer blobContainer;
/**
* Constructs new restore context
* @param shardId shard id to restore into
* @param snapshotId snapshot id
* @param recoveryState recovery state to report progress
* @param blobContainer the blob container to read the files from
*/
RestoreContext(ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) {
super(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE);
this.blobContainer = blobContainer;
/**
* Loads all available snapshots in the repository
*
* @param blobs list of blobs in repository
* @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation
*/
private Tuple<BlobStoreIndexShardSnapshots, Integer> buildBlobStoreIndexShardSnapshots(Map<String, BlobMetaData> blobs,
BlobContainer shardContainer) {
int latest = -1;
Set<String> blobKeys = blobs.keySet();
for (String name : blobKeys) {
if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) {
try {
int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length()));
if (gen > latest) {
latest = gen;
}
} catch (NumberFormatException ex) {
logger.warn("failed to parse index file name [{}]", name);
}
}
}
if (latest >= 0) {
try {
final BlobStoreIndexShardSnapshots shardSnapshots =
indexShardSnapshotsFormat.read(shardContainer, Integer.toString(latest));
return new Tuple<>(shardSnapshots, latest);
} catch (IOException e) {
final String file = SNAPSHOT_INDEX_PREFIX + latest;
logger.warn(() -> new ParameterizedMessage("failed to read index file [{}]", file), e);
}
} else if (blobKeys.isEmpty() == false) {
logger.warn("Could not find a readable index-N file in a non-empty shard snapshot directory [{}]", shardContainer.path());
}
@Override
protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) {
if (restoreRateLimiter == null) {
return new PartSliceStream(blobContainer, fileInfo);
} else {
RateLimitingInputStream.Listener listener = restoreRateLimitingTimeInNanos::inc;
return new RateLimitingInputStream(new PartSliceStream(blobContainer, fileInfo), restoreRateLimiter, listener);
// We couldn't load the index file - falling back to loading individual snapshots
List<SnapshotFiles> snapshots = new ArrayList<>();
for (String name : blobKeys) {
try {
BlobStoreIndexShardSnapshot snapshot = null;
if (name.startsWith(SNAPSHOT_PREFIX)) {
snapshot = indexShardSnapshotFormat.readBlob(shardContainer, name);
}
if (snapshot != null) {
snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()));
}
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("Failed to read blob [{}]", name), e);
}
}
return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), latest);
}
/**
* Snapshot individual file
* @param fileInfo file to be snapshotted
*/
private void snapshotFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, IndexId indexId, ShardId shardId, SnapshotId snapshotId,
IndexShardSnapshotStatus snapshotStatus, Store store) throws IOException {
final BlobContainer shardContainer = shardContainer(indexId, shardId);
final String file = fileInfo.physicalName();
try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) {
for (int i = 0; i < fileInfo.numberOfParts(); i++) {
final long partBytes = fileInfo.partBytes(i);
InputStream inputStream = new InputStreamIndexInput(indexInput, partBytes);
if (snapshotRateLimiter != null) {
inputStream = new RateLimitingInputStream(inputStream, snapshotRateLimiter,
snapshotRateLimitingTimeInNanos::inc);
}
// Make reads abortable by mutating the snapshotStatus object
inputStream = new FilterInputStream(inputStream) {
@Override
public int read() throws IOException {
checkAborted();
return super.read();
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
checkAborted();
return super.read(b, off, len);
}
private void checkAborted() {
if (snapshotStatus.isAborted()) {
logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId,
snapshotId, fileInfo.physicalName());
throw new IndexShardSnapshotFailedException(shardId, "Aborted");
}
}
};
shardContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes, true);
}
Store.verify(indexInput);
snapshotStatus.addProcessedFile(fileInfo.length());
} catch (Exception t) {
failStoreIfCorrupted(store, t);
snapshotStatus.addProcessedFile(0);
throw t;
}
}
private static void failStoreIfCorrupted(Store store, Exception e) {
if (Lucene.isCorruptionException(e)) {
try {
store.markStoreCorrupted((IOException) e);
} catch (IOException inner) {
inner.addSuppressed(e);
logger.warn("store cannot be marked as corrupted", inner);
}
}
}
/**
* Checks if snapshot file already exists in the list of blobs
* @param fileInfo file to check
* @param blobs list of blobs
* @return true if file exists in the list of blobs
*/
private static boolean snapshotFileExistsInBlobs(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Map<String, BlobMetaData> blobs) {
BlobMetaData blobMetaData = blobs.get(fileInfo.name());
if (blobMetaData != null) {
return blobMetaData.length() == fileInfo.length();
} else if (blobs.containsKey(fileInfo.partName(0))) {
// multi part file sum up the size and check
int part = 0;
long totalSize = 0;
while (true) {
blobMetaData = blobs.get(fileInfo.partName(part++));
if (blobMetaData == null) {
break;
}
totalSize += blobMetaData.length();
}
return totalSize == fileInfo.length();
}
// no file, not exact and not multipart
return false;
}
}

View File

@ -285,7 +285,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
request.partial(),
State.INIT,
snapshotIndices,
System.currentTimeMillis(),
threadPool.absoluteTimeInMillis(),
repositoryData.getGenId(),
null,
request.userMetadata());
@ -1168,7 +1168,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// add the snapshot deletion to the cluster state
SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry(
snapshot,
System.currentTimeMillis(),
threadPool.absoluteTimeInMillis(),
repositoryStateId
);
if (deletionsInProgress != null) {