diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index 3b31e2a7d53..85b1721c978 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -502,8 +502,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp if (indexMetaData != null) { for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { try { - final ShardId sid = new ShardId(indexMetaData.getIndex(), shardId); - new Context(snapshotId, indexId, sid, sid).delete(); + deleteShardSnapshot(indexId, new ShardId(indexMetaData.getIndex(), shardId), snapshotId); } catch (SnapshotException ex) { final int finalShardId = shardId; logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", @@ -526,9 +525,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } } - /** - * {@inheritDoc} - */ @Override public SnapshotInfo finalizeSnapshot(final SnapshotId snapshotId, final List indices, @@ -541,7 +537,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp final Map userMetadata) { SnapshotInfo blobStoreSnapshot = new SnapshotInfo(snapshotId, indices.stream().map(IndexId::getName).collect(Collectors.toList()), - startTime, failure, System.currentTimeMillis(), totalShards, shardFailures, + startTime, failure, threadPool.absoluteTimeInMillis(), totalShards, shardFailures, includeGlobalState, userMetadata); try { final RepositoryData updatedRepositoryData = getRepositoryData().addSnapshot(snapshotId, blobStoreSnapshot.state(), indices); @@ -845,11 +841,130 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp @Override public void snapshotShard(Store store, MapperService mapperService, SnapshotId snapshotId, IndexId indexId, IndexCommit snapshotIndexCommit, IndexShardSnapshotStatus snapshotStatus) { - SnapshotContext snapshotContext = new SnapshotContext(store, snapshotId, indexId, snapshotStatus, System.currentTimeMillis()); + final ShardId shardId = store.shardId(); + final long startTime = threadPool.absoluteTimeInMillis(); try { - snapshotContext.snapshot(snapshotIndexCommit); + logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); + + final BlobContainer shardContainer = shardContainer(indexId, shardId); + final Map blobs; + try { + blobs = shardContainer.listBlobs(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); + } + + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); + + if (snapshots.snapshots().stream().anyMatch(sf -> sf.snapshot().equals(snapshotId.getName()))) { + throw new IndexShardSnapshotFailedException(shardId, + "Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting"); + } + + final List indexCommitPointFiles = new ArrayList<>(); + store.incRef(); + try { + ArrayList filesToSnapshot = new ArrayList<>(); + final Store.MetadataSnapshot metadata; + // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should + final Collection fileNames; + try { + logger.trace( + "[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); + metadata = store.getMetadata(snapshotIndexCommit); + fileNames = snapshotIndexCommit.getFileNames(); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); + } + int indexIncrementalFileCount = 0; + int indexTotalNumberOfFiles = 0; + long indexIncrementalSize = 0; + long indexTotalFileCount = 0; + for (String fileName : fileNames) { + if (snapshotStatus.isAborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + + logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); + final StoreFileMetaData md = metadata.get(fileName); + BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null; + List filesInfo = snapshots.findPhysicalIndexFiles(fileName); + if (filesInfo != null) { + for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) { + if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) { + // a commit point file with the same name, size and checksum was already copied to repository + // we will reuse it for this snapshot + existingFileInfo = fileInfo; + break; + } + } + } + + indexTotalFileCount += md.length(); + indexTotalNumberOfFiles++; + + if (existingFileInfo == null) { + indexIncrementalFileCount++; + indexIncrementalSize += md.length(); + // create a new FileInfo + BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = + new BlobStoreIndexShardSnapshot.FileInfo(DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(), md, chunkSize()); + indexCommitPointFiles.add(snapshotFileInfo); + filesToSnapshot.add(snapshotFileInfo); + } else { + indexCommitPointFiles.add(existingFileInfo); + } + } + + snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount, + indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount); + + for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { + try { + snapshotFile(snapshotFileInfo, indexId, shardId, snapshotId, snapshotStatus, store); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); + } + } + } finally { + store.decRef(); + } + + final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); + + // now create and write the commit point + final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), + lastSnapshotStatus.getIndexVersion(), + indexCommitPointFiles, + lastSnapshotStatus.getStartTime(), + threadPool.absoluteTimeInMillis() - lastSnapshotStatus.getStartTime(), + lastSnapshotStatus.getIncrementalFileCount(), + lastSnapshotStatus.getIncrementalSize() + ); + + logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); + try { + indexShardSnapshotFormat.write(snapshot, shardContainer, snapshotId.getUUID()); + } catch (IOException e) { + throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); + } + + // delete all files that are not referenced by any commit point + // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones + List newSnapshotsList = new ArrayList<>(); + newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + for (SnapshotFiles point : snapshots) { + newSnapshotsList.add(point); + } + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot creation [" + snapshotId + "]", shardContainer, + shardId, snapshotId); + snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis()); } catch (Exception e) { - snapshotStatus.moveToFailed(System.currentTimeMillis(), ExceptionsHelper.detailedMessage(e)); + snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e)); if (e instanceof IndexShardSnapshotFailedException) { throw (IndexShardSnapshotFailedException) e; } else { @@ -859,16 +974,26 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } @Override - public void restoreShard(Store store, SnapshotId snapshotId, - Version version, IndexId indexId, ShardId snapshotShardId, RecoveryState recoveryState) { + public void restoreShard(Store store, SnapshotId snapshotId, Version version, IndexId indexId, ShardId snapshotShardId, + RecoveryState recoveryState) { ShardId shardId = store.shardId(); - final Context context = new Context(snapshotId, indexId, shardId, snapshotShardId); - final RestoreContext snapshotContext = - new RestoreContext(shardId, snapshotId, recoveryState, shardContainer(indexId, snapshotShardId)); try { - BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); + final BlobContainer container = shardContainer(indexId, snapshotShardId); + BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(container, snapshotId); SnapshotFiles snapshotFiles = new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles()); - snapshotContext.restore(snapshotFiles, store); + new FileRestoreContext(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE) { + @Override + protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { + final InputStream dataBlobCompositeStream = new SlicedInputStream(fileInfo.numberOfParts()) { + @Override + protected InputStream openSlice(long slice) throws IOException { + return container.readBlob(fileInfo.partName(slice)); + } + }; + return restoreRateLimiter == null ? dataBlobCompositeStream + : new RateLimitingInputStream(dataBlobCompositeStream, restoreRateLimiter, restoreRateLimitingTimeInNanos::inc); + } + }.restore(snapshotFiles, store); } catch (Exception e) { throw new IndexShardRestoreFailedException(shardId, "failed to restore snapshot [" + snapshotId + "]", e); } @@ -876,8 +1001,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp @Override public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId shardId) { - Context context = new Context(snapshotId, indexId, shardId); - BlobStoreIndexShardSnapshot snapshot = context.loadSnapshot(); + BlobStoreIndexShardSnapshot snapshot = loadShardSnapshot(shardContainer(indexId, shardId), snapshotId); return IndexShardSnapshotStatus.newDone(snapshot.startTime(), snapshot.time(), snapshot.incrementalFileCount(), snapshot.totalFileCount(), snapshot.incrementalSize(), snapshot.totalSize()); @@ -923,458 +1047,230 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp } /** - * Context for snapshot/restore operations + * Delete shard snapshot */ - private class Context { - - protected final SnapshotId snapshotId; - - protected final ShardId shardId; - - protected final BlobContainer blobContainer; - - Context(SnapshotId snapshotId, IndexId indexId, ShardId shardId) { - this(snapshotId, indexId, shardId, shardId); + private void deleteShardSnapshot(IndexId indexId, ShardId snapshotShardId, SnapshotId snapshotId) { + final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId); + final Map blobs; + try { + blobs = shardContainer.listBlobs(); + } catch (IOException e) { + throw new IndexShardSnapshotException(snapshotShardId, "Failed to list content of shard directory", e); } - Context(SnapshotId snapshotId, IndexId indexId, ShardId shardId, ShardId snapshotShardId) { - this.snapshotId = snapshotId; - this.shardId = shardId; - blobContainer = shardContainer(indexId, snapshotShardId); + Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs, shardContainer); + BlobStoreIndexShardSnapshots snapshots = tuple.v1(); + int fileListGeneration = tuple.v2(); + + try { + indexShardSnapshotFormat.delete(shardContainer, snapshotId.getUUID()); + } catch (IOException e) { + logger.warn(new ParameterizedMessage("[{}] [{}] failed to delete shard snapshot file", snapshotShardId, snapshotId), e); } - /** - * Delete shard snapshot - */ - public void delete() { - final Map blobs; - try { - blobs = blobContainer.listBlobs(); - } catch (IOException e) { - throw new IndexShardSnapshotException(shardId, "Failed to list content of gateway", e); - } - - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); - BlobStoreIndexShardSnapshots snapshots = tuple.v1(); - int fileListGeneration = tuple.v2(); - - try { - indexShardSnapshotFormat.delete(blobContainer, snapshotId.getUUID()); - } catch (IOException e) { - logger.warn(new ParameterizedMessage("[{}] [{}] failed to delete shard snapshot file", shardId, snapshotId), e); - } - - // Build a list of snapshots that should be preserved - List newSnapshotsList = new ArrayList<>(); - for (SnapshotFiles point : snapshots) { - if (!point.snapshot().equals(snapshotId.getName())) { - newSnapshotsList.add(point); - } - } - // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index - finalize(newSnapshotsList, fileListGeneration + 1, blobs, "snapshot deletion [" + snapshotId + "]"); - } - - /** - * Loads information about shard snapshot - */ - BlobStoreIndexShardSnapshot loadSnapshot() { - try { - return indexShardSnapshotFormat.read(blobContainer, snapshotId.getUUID()); - } catch (IOException ex) { - throw new SnapshotException(metadata.name(), snapshotId, "failed to read shard snapshot file for " + shardId, ex); - } - } - - /** - * Writes a new index file for the shard and removes all unreferenced files from the repository. - * - * We need to be really careful in handling index files in case of failures to make sure we don't - * have index file that points to files that were deleted. - * - * @param snapshots list of active snapshots in the container - * @param fileListGeneration the generation number of the snapshot index file - * @param blobs list of blobs in the container - * @param reason a reason explaining why the shard index file is written - */ - protected void finalize(final List snapshots, - final int fileListGeneration, - final Map blobs, - final String reason) { - final String indexGeneration = Integer.toString(fileListGeneration); - try { - final List blobsToDelete; - if (snapshots.isEmpty()) { - // If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found - blobsToDelete = new ArrayList<>(blobs.keySet()); - } else { - final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots); - indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, blobContainer, indexGeneration); - // Delete all previous index-N, data-blobs that are not referenced by the new index-N and temporary blobs - blobsToDelete = blobs.keySet().stream().filter(blob -> - blob.startsWith(SNAPSHOT_INDEX_PREFIX) - || blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null - || FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList()); - } - try { - blobContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete); - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization", - snapshotId, shardId), e); - } - } catch (IOException e) { - String message = - "Failed to finalize " + reason + " with shard index [" + indexShardSnapshotsFormat.blobName(indexGeneration) + "]"; - throw new IndexShardSnapshotFailedException(shardId, message, e); - } - } - - /** - * Loads all available snapshots in the repository - * - * @param blobs list of blobs in repository - * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation - */ - protected Tuple buildBlobStoreIndexShardSnapshots(Map blobs) { - int latest = -1; - Set blobKeys = blobs.keySet(); - for (String name : blobKeys) { - if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) { - try { - int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length())); - if (gen > latest) { - latest = gen; - } - } catch (NumberFormatException ex) { - logger.warn("failed to parse index file name [{}]", name); - } - } - } - if (latest >= 0) { - try { - final BlobStoreIndexShardSnapshots shardSnapshots = - indexShardSnapshotsFormat.read(blobContainer, Integer.toString(latest)); - return new Tuple<>(shardSnapshots, latest); - } catch (IOException e) { - final String file = SNAPSHOT_INDEX_PREFIX + latest; - logger.warn(() -> new ParameterizedMessage("failed to read index file [{}]", file), e); - } - } else if (blobKeys.isEmpty() == false) { - logger.warn("Could not find a readable index-N file in a non-empty shard snapshot directory [{}]", blobContainer.path()); - } - - // We couldn't load the index file - falling back to loading individual snapshots - List snapshots = new ArrayList<>(); - for (String name : blobKeys) { - try { - BlobStoreIndexShardSnapshot snapshot = null; - if (name.startsWith(SNAPSHOT_PREFIX)) { - snapshot = indexShardSnapshotFormat.readBlob(blobContainer, name); - } - if (snapshot != null) { - snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); - } - } catch (IOException e) { - logger.warn(() -> new ParameterizedMessage("failed to read commit point [{}]", name), e); - } - } - return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), latest); - } - } - - /** - * Context for snapshot operations - */ - private class SnapshotContext extends Context { - - private final Store store; - private final IndexShardSnapshotStatus snapshotStatus; - private final long startTime; - - /** - * Constructs new context - * - * @param store store to be snapshotted - * @param snapshotId snapshot id - * @param indexId the id of the index being snapshotted - * @param snapshotStatus snapshot status to report progress - */ - SnapshotContext(Store store, SnapshotId snapshotId, IndexId indexId, IndexShardSnapshotStatus snapshotStatus, long startTime) { - super(snapshotId, indexId, store.shardId()); - this.snapshotStatus = snapshotStatus; - this.store = store; - this.startTime = startTime; - } - - /** - * Create snapshot from index commit point - * - * @param snapshotIndexCommit snapshot commit point - */ - public void snapshot(final IndexCommit snapshotIndexCommit) { - logger.debug("[{}] [{}] snapshot to [{}] ...", shardId, snapshotId, metadata.name()); - - final Map blobs; - try { - blobs = blobContainer.listBlobs(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "failed to list blobs", e); - } - - Tuple tuple = buildBlobStoreIndexShardSnapshots(blobs); - BlobStoreIndexShardSnapshots snapshots = tuple.v1(); - int fileListGeneration = tuple.v2(); - - if (snapshots.snapshots().stream().anyMatch(sf -> sf.snapshot().equals(snapshotId.getName()))) { - throw new IndexShardSnapshotFailedException(shardId, - "Duplicate snapshot name [" + snapshotId.getName() + "] detected, aborting"); - } - - final List indexCommitPointFiles = new ArrayList<>(); - - store.incRef(); - int indexIncrementalFileCount = 0; - int indexTotalNumberOfFiles = 0; - long indexIncrementalSize = 0; - long indexTotalFileCount = 0; - try { - ArrayList filesToSnapshot = new ArrayList<>(); - final Store.MetadataSnapshot metadata; - // TODO apparently we don't use the MetadataSnapshot#.recoveryDiff(...) here but we should - final Collection fileNames; - try { - logger.trace("[{}] [{}] Loading store metadata using index commit [{}]", shardId, snapshotId, snapshotIndexCommit); - metadata = store.getMetadata(snapshotIndexCommit); - fileNames = snapshotIndexCommit.getFileNames(); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to get store file metadata", e); - } - for (String fileName : fileNames) { - if (snapshotStatus.isAborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - - logger.trace("[{}] [{}] Processing [{}]", shardId, snapshotId, fileName); - final StoreFileMetaData md = metadata.get(fileName); - BlobStoreIndexShardSnapshot.FileInfo existingFileInfo = null; - List filesInfo = snapshots.findPhysicalIndexFiles(fileName); - if (filesInfo != null) { - for (BlobStoreIndexShardSnapshot.FileInfo fileInfo : filesInfo) { - if (fileInfo.isSame(md) && snapshotFileExistsInBlobs(fileInfo, blobs)) { - // a commit point file with the same name, size and checksum was already copied to repository - // we will reuse it for this snapshot - existingFileInfo = fileInfo; - break; - } - } - } - - indexTotalFileCount += md.length(); - indexTotalNumberOfFiles++; - - if (existingFileInfo == null) { - indexIncrementalFileCount++; - indexIncrementalSize += md.length(); - // create a new FileInfo - BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo = - new BlobStoreIndexShardSnapshot.FileInfo(DATA_BLOB_PREFIX + UUIDs.randomBase64UUID(), md, chunkSize()); - indexCommitPointFiles.add(snapshotFileInfo); - filesToSnapshot.add(snapshotFileInfo); - } else { - indexCommitPointFiles.add(existingFileInfo); - } - } - - snapshotStatus.moveToStarted(startTime, indexIncrementalFileCount, - indexTotalNumberOfFiles, indexIncrementalSize, indexTotalFileCount); - - for (BlobStoreIndexShardSnapshot.FileInfo snapshotFileInfo : filesToSnapshot) { - try { - snapshotFile(snapshotFileInfo); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to perform snapshot (index files)", e); - } - } - } finally { - store.decRef(); - } - - final IndexShardSnapshotStatus.Copy lastSnapshotStatus = snapshotStatus.moveToFinalize(snapshotIndexCommit.getGeneration()); - - // now create and write the commit point - final BlobStoreIndexShardSnapshot snapshot = new BlobStoreIndexShardSnapshot(snapshotId.getName(), - lastSnapshotStatus.getIndexVersion(), - indexCommitPointFiles, - lastSnapshotStatus.getStartTime(), - // snapshotStatus.startTime() is assigned on the same machine, - // so it's safe to use with VLong - System.currentTimeMillis() - lastSnapshotStatus.getStartTime(), - lastSnapshotStatus.getIncrementalFileCount(), - lastSnapshotStatus.getIncrementalSize() - ); - - //TODO: The time stored in snapshot doesn't include cleanup time. - logger.trace("[{}] [{}] writing shard snapshot file", shardId, snapshotId); - try { - indexShardSnapshotFormat.write(snapshot, blobContainer, snapshotId.getUUID()); - } catch (IOException e) { - throw new IndexShardSnapshotFailedException(shardId, "Failed to write commit point", e); - } - - // delete all files that are not referenced by any commit point - // build a new BlobStoreIndexShardSnapshot, that includes this one and all the saved ones - List newSnapshotsList = new ArrayList<>(); - newSnapshotsList.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); - for (SnapshotFiles point : snapshots) { + // Build a list of snapshots that should be preserved + List newSnapshotsList = new ArrayList<>(); + for (SnapshotFiles point : snapshots) { + if (!point.snapshot().equals(snapshotId.getName())) { newSnapshotsList.add(point); } - // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index - finalize(newSnapshotsList, fileListGeneration + 1, blobs, "snapshot creation [" + snapshotId + "]"); - snapshotStatus.moveToDone(System.currentTimeMillis()); - } - - /** - * Snapshot individual file - * - * @param fileInfo file to be snapshotted - */ - private void snapshotFile(final BlobStoreIndexShardSnapshot.FileInfo fileInfo) throws IOException { - final String file = fileInfo.physicalName(); - try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { - for (int i = 0; i < fileInfo.numberOfParts(); i++) { - final long partBytes = fileInfo.partBytes(i); - - final InputStreamIndexInput inputStreamIndexInput = new InputStreamIndexInput(indexInput, partBytes); - InputStream inputStream = inputStreamIndexInput; - if (snapshotRateLimiter != null) { - inputStream = new RateLimitingInputStream(inputStreamIndexInput, snapshotRateLimiter, - snapshotRateLimitingTimeInNanos::inc); - } - inputStream = new AbortableInputStream(inputStream, fileInfo.physicalName()); - blobContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes, true); - } - Store.verify(indexInput); - snapshotStatus.addProcessedFile(fileInfo.length()); - } catch (Exception t) { - failStoreIfCorrupted(t); - snapshotStatus.addProcessedFile(0); - throw t; - } - } - - private void failStoreIfCorrupted(Exception e) { - if (Lucene.isCorruptionException(e)) { - try { - store.markStoreCorrupted((IOException) e); - } catch (IOException inner) { - inner.addSuppressed(e); - logger.warn("store cannot be marked as corrupted", inner); - } - } - } - - /** - * Checks if snapshot file already exists in the list of blobs - * - * @param fileInfo file to check - * @param blobs list of blobs - * @return true if file exists in the list of blobs - */ - private boolean snapshotFileExistsInBlobs(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Map blobs) { - BlobMetaData blobMetaData = blobs.get(fileInfo.name()); - if (blobMetaData != null) { - return blobMetaData.length() == fileInfo.length(); - } else if (blobs.containsKey(fileInfo.partName(0))) { - // multi part file sum up the size and check - int part = 0; - long totalSize = 0; - while (true) { - blobMetaData = blobs.get(fileInfo.partName(part++)); - if (blobMetaData == null) { - break; - } - totalSize += blobMetaData.length(); - } - return totalSize == fileInfo.length(); - } - // no file, not exact and not multipart - return false; - } - - private class AbortableInputStream extends FilterInputStream { - private final String fileName; - - AbortableInputStream(InputStream delegate, String fileName) { - super(delegate); - this.fileName = fileName; - } - - @Override - public int read() throws IOException { - checkAborted(); - return in.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - checkAborted(); - return in.read(b, off, len); - } - - private void checkAborted() { - if (snapshotStatus.isAborted()) { - logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, snapshotId, fileName); - throw new IndexShardSnapshotFailedException(shardId, "Aborted"); - } - } } + // finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index + finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot deletion [" + snapshotId + "]", shardContainer, + snapshotShardId, snapshotId); } - private static final class PartSliceStream extends SlicedInputStream { - - private final BlobContainer container; - private final BlobStoreIndexShardSnapshot.FileInfo info; - - PartSliceStream(BlobContainer container, BlobStoreIndexShardSnapshot.FileInfo info) { - super(info.numberOfParts()); - this.info = info; - this.container = container; - } - - @Override - protected InputStream openSlice(long slice) throws IOException { - return container.readBlob(info.partName(slice)); + /** + * Loads information about shard snapshot + */ + private BlobStoreIndexShardSnapshot loadShardSnapshot(BlobContainer shardContainer, SnapshotId snapshotId) { + try { + return indexShardSnapshotFormat.read(shardContainer, snapshotId.getUUID()); + } catch (IOException ex) { + throw new SnapshotException(metadata.name(), snapshotId, + "failed to read shard snapshot file for [" + shardContainer.path() + ']', ex); } } /** - * Context for restore operations + * Writes a new index file for the shard and removes all unreferenced files from the repository. + * + * We need to be really careful in handling index files in case of failures to make sure we don't + * have index file that points to files that were deleted. + * + * @param snapshots list of active snapshots in the container + * @param fileListGeneration the generation number of the current snapshot index file + * @param blobs list of blobs in the container + * @param reason a reason explaining why the shard index file is written */ - private class RestoreContext extends FileRestoreContext { + private void finalizeShard(List snapshots, int fileListGeneration, Map blobs, + String reason, BlobContainer shardContainer, ShardId shardId, SnapshotId snapshotId) { + final String indexGeneration = Integer.toString(fileListGeneration + 1); + try { + final List blobsToDelete; + if (snapshots.isEmpty()) { + // If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found + blobsToDelete = new ArrayList<>(blobs.keySet()); + } else { + final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots); + indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration); + // Delete all previous index-N, data-blobs that are not referenced by the new index-N and temporary blobs + blobsToDelete = blobs.keySet().stream().filter(blob -> + blob.startsWith(SNAPSHOT_INDEX_PREFIX) + || blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null + || FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList()); + } + try { + shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete); + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization", + snapshotId, shardId), e); + } + } catch (IOException e) { + String message = + "Failed to finalize " + reason + " with shard index [" + indexShardSnapshotsFormat.blobName(indexGeneration) + "]"; + throw new IndexShardSnapshotFailedException(shardId, message, e); + } + } - private final BlobContainer blobContainer; - - /** - * Constructs new restore context - * @param shardId shard id to restore into - * @param snapshotId snapshot id - * @param recoveryState recovery state to report progress - * @param blobContainer the blob container to read the files from - */ - RestoreContext(ShardId shardId, SnapshotId snapshotId, RecoveryState recoveryState, BlobContainer blobContainer) { - super(metadata.name(), shardId, snapshotId, recoveryState, BUFFER_SIZE); - this.blobContainer = blobContainer; + /** + * Loads all available snapshots in the repository + * + * @param blobs list of blobs in repository + * @return tuple of BlobStoreIndexShardSnapshots and the last snapshot index generation + */ + private Tuple buildBlobStoreIndexShardSnapshots(Map blobs, + BlobContainer shardContainer) { + int latest = -1; + Set blobKeys = blobs.keySet(); + for (String name : blobKeys) { + if (name.startsWith(SNAPSHOT_INDEX_PREFIX)) { + try { + int gen = Integer.parseInt(name.substring(SNAPSHOT_INDEX_PREFIX.length())); + if (gen > latest) { + latest = gen; + } + } catch (NumberFormatException ex) { + logger.warn("failed to parse index file name [{}]", name); + } + } + } + if (latest >= 0) { + try { + final BlobStoreIndexShardSnapshots shardSnapshots = + indexShardSnapshotsFormat.read(shardContainer, Integer.toString(latest)); + return new Tuple<>(shardSnapshots, latest); + } catch (IOException e) { + final String file = SNAPSHOT_INDEX_PREFIX + latest; + logger.warn(() -> new ParameterizedMessage("failed to read index file [{}]", file), e); + } + } else if (blobKeys.isEmpty() == false) { + logger.warn("Could not find a readable index-N file in a non-empty shard snapshot directory [{}]", shardContainer.path()); } - @Override - protected InputStream fileInputStream(BlobStoreIndexShardSnapshot.FileInfo fileInfo) { - if (restoreRateLimiter == null) { - return new PartSliceStream(blobContainer, fileInfo); - } else { - RateLimitingInputStream.Listener listener = restoreRateLimitingTimeInNanos::inc; - return new RateLimitingInputStream(new PartSliceStream(blobContainer, fileInfo), restoreRateLimiter, listener); + // We couldn't load the index file - falling back to loading individual snapshots + List snapshots = new ArrayList<>(); + for (String name : blobKeys) { + try { + BlobStoreIndexShardSnapshot snapshot = null; + if (name.startsWith(SNAPSHOT_PREFIX)) { + snapshot = indexShardSnapshotFormat.readBlob(shardContainer, name); + } + if (snapshot != null) { + snapshots.add(new SnapshotFiles(snapshot.snapshot(), snapshot.indexFiles())); + } + } catch (IOException e) { + logger.warn(() -> new ParameterizedMessage("Failed to read blob [{}]", name), e); + } + } + return new Tuple<>(new BlobStoreIndexShardSnapshots(snapshots), latest); + } + + /** + * Snapshot individual file + * @param fileInfo file to be snapshotted + */ + private void snapshotFile(BlobStoreIndexShardSnapshot.FileInfo fileInfo, IndexId indexId, ShardId shardId, SnapshotId snapshotId, + IndexShardSnapshotStatus snapshotStatus, Store store) throws IOException { + final BlobContainer shardContainer = shardContainer(indexId, shardId); + final String file = fileInfo.physicalName(); + try (IndexInput indexInput = store.openVerifyingInput(file, IOContext.READONCE, fileInfo.metadata())) { + for (int i = 0; i < fileInfo.numberOfParts(); i++) { + final long partBytes = fileInfo.partBytes(i); + + InputStream inputStream = new InputStreamIndexInput(indexInput, partBytes); + if (snapshotRateLimiter != null) { + inputStream = new RateLimitingInputStream(inputStream, snapshotRateLimiter, + snapshotRateLimitingTimeInNanos::inc); + } + // Make reads abortable by mutating the snapshotStatus object + inputStream = new FilterInputStream(inputStream) { + @Override + public int read() throws IOException { + checkAborted(); + return super.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkAborted(); + return super.read(b, off, len); + } + + private void checkAborted() { + if (snapshotStatus.isAborted()) { + logger.debug("[{}] [{}] Aborted on the file [{}], exiting", shardId, + snapshotId, fileInfo.physicalName()); + throw new IndexShardSnapshotFailedException(shardId, "Aborted"); + } + } + }; + shardContainer.writeBlob(fileInfo.partName(i), inputStream, partBytes, true); + } + Store.verify(indexInput); + snapshotStatus.addProcessedFile(fileInfo.length()); + } catch (Exception t) { + failStoreIfCorrupted(store, t); + snapshotStatus.addProcessedFile(0); + throw t; + } + } + + private static void failStoreIfCorrupted(Store store, Exception e) { + if (Lucene.isCorruptionException(e)) { + try { + store.markStoreCorrupted((IOException) e); + } catch (IOException inner) { + inner.addSuppressed(e); + logger.warn("store cannot be marked as corrupted", inner); } } } + + /** + * Checks if snapshot file already exists in the list of blobs + * @param fileInfo file to check + * @param blobs list of blobs + * @return true if file exists in the list of blobs + */ + private static boolean snapshotFileExistsInBlobs(BlobStoreIndexShardSnapshot.FileInfo fileInfo, Map blobs) { + BlobMetaData blobMetaData = blobs.get(fileInfo.name()); + if (blobMetaData != null) { + return blobMetaData.length() == fileInfo.length(); + } else if (blobs.containsKey(fileInfo.partName(0))) { + // multi part file sum up the size and check + int part = 0; + long totalSize = 0; + while (true) { + blobMetaData = blobs.get(fileInfo.partName(part++)); + if (blobMetaData == null) { + break; + } + totalSize += blobMetaData.length(); + } + return totalSize == fileInfo.length(); + } + // no file, not exact and not multipart + return false; + } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 1563facd335..65aac9c10fe 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -285,7 +285,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus request.partial(), State.INIT, snapshotIndices, - System.currentTimeMillis(), + threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), null, request.userMetadata()); @@ -1168,7 +1168,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus // add the snapshot deletion to the cluster state SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry( snapshot, - System.currentTimeMillis(), + threadPool.absoluteTimeInMillis(), repositoryStateId ); if (deletionsInProgress != null) {