diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java index ecbd5c823a4..f9ee7c3fe40 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/BlobStoreRepository.java @@ -107,6 +107,7 @@ import java.util.Set; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName; @@ -429,12 +430,43 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp ActionListener listener) throws IOException { final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId); writeIndexGen(updatedRepositoryData, repositoryStateId); + final ActionListener afterCleanupsListener = + new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2); + + // Run unreferenced blobs cleanup in parallel to snapshot deletion + threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(afterCleanupsListener, + l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null)))); + deleteIndices( updatedRepositoryData, repositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId), snapshotId, - ActionListener.delegateFailure(listener, - (l, v) -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null)))); + ActionListener.runAfter( + ActionListener.wrap( + deleteResults -> { + // Now that all metadata (RepositoryData at the repo root as well as index-N blobs in all shard paths) + // has been updated we can execute the delete operations for all blobs that have become unreferenced as a result + final String basePath = basePath().buildAsString(); + final int basePathLen = basePath.length(); + blobContainer().deleteBlobsIgnoringIfNotExists( + Stream.concat( + deleteResults.stream().flatMap(shardResult -> { + final String shardPath = + shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString(); + return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob); + }), + deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId -> + indexContainer(indexId).path().buildAsString() + globalMetaDataFormat.blobName(snapshotId.getUUID())) + ).map(absolutePath -> { + assert absolutePath.startsWith(basePath); + return absolutePath.substring(basePathLen); + }).collect(Collectors.toList())); + }, + // Any exceptions after we have updated the root level RepositoryData are only logged but won't fail the delete request + e -> logger.warn( + () -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e)), + () -> afterCleanupsListener.onResponse(null)) + ); } /** @@ -593,90 +625,58 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp * @param listener Listener to invoke when finished */ private void deleteIndices(RepositoryData repositoryData, List indices, SnapshotId snapshotId, - ActionListener listener) { + ActionListener> listener) { + if (indices.isEmpty()) { - listener.onResponse(null); + listener.onResponse(Collections.emptyList()); return; } - // listener to complete once all shards folders affected by this delete have been added new metadata blobs without this snapshot - final StepListener> deleteFromMetaListener = new StepListener<>(); // Listener that flattens out the delete results for each index final ActionListener> deleteIndexMetaDataListener = new GroupedActionListener<>( - ActionListener.map(deleteFromMetaListener, - results -> results.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size()); + ActionListener.map(listener, res -> res.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size()); final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT); for (IndexId indexId : indices) { executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener, deleteIdxMetaListener -> { - IndexMetaData indexMetaData = null; + final IndexMetaData indexMetaData; try { indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId); } catch (Exception ex) { logger.warn(() -> new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex); - } - deleteIndexMetaDataBlobIgnoringErrors(snapshotId, indexId); - if (indexMetaData != null) { - final int shardCount = indexMetaData.getNumberOfShards(); - assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]"; - // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index - final ActionListener allShardsListener = - new GroupedActionListener<>(deleteIdxMetaListener, shardCount); - final Index index = indexMetaData.getIndex(); - for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { - final ShardId shard = new ShardId(index, shardId); - executor.execute(new AbstractRunnable() { - @Override - protected void doRun() throws Exception { - allShardsListener.onResponse( - deleteShardSnapshot(repositoryData, indexId, shard, snapshotId)); - } - - @Override - public void onFailure(Exception ex) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", - snapshotId, indexId.getName(), shard.id()), ex); - // Just passing null here to count down the listener instead of failing it, the stale data left behind - // here will be retried in the next delete or repository cleanup - allShardsListener.onResponse(null); - } - }); - } - } else { // Just invoke the listener without any shard generations to count it down, this index will be cleaned up // by the stale data cleanup in the end. deleteIdxMetaListener.onResponse(null); + return; + } + final int shardCount = indexMetaData.getNumberOfShards(); + assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]"; + // Listener for collecting the results of removing the snapshot from each shard's metadata in the current index + final ActionListener allShardsListener = + new GroupedActionListener<>(deleteIdxMetaListener, shardCount); + final Index index = indexMetaData.getIndex(); + for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) { + final ShardId shard = new ShardId(index, shardId); + executor.execute(new AbstractRunnable() { + @Override + protected void doRun() throws Exception { + allShardsListener.onResponse( + deleteShardSnapshot(repositoryData, indexId, shard, snapshotId)); + } + + @Override + public void onFailure(Exception ex) { + logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]", + snapshotId, indexId.getName(), shard.id()), ex); + // Just passing null here to count down the listener instead of failing it, the stale data left behind + // here will be retried in the next delete or repository cleanup + allShardsListener.onResponse(null); + } + }); } })); } - - // Delete all the now unreferenced blobs in the shard paths - deleteFromMetaListener.whenComplete(newGens -> { - final String basePath = basePath().buildAsString(); - final int basePathLen = basePath.length(); - blobContainer().deleteBlobsIgnoringIfNotExists( - newGens.stream().flatMap(shardBlob -> { - final String shardPathAbs = shardContainer(shardBlob.indexId, shardBlob.shardId).path().buildAsString(); - assert shardPathAbs.startsWith(basePath); - final String pathToShard = shardPathAbs.substring(basePathLen); - return shardBlob.blobsToDelete.stream().map(blob -> pathToShard + blob); - }).collect(Collectors.toList()) - ); - listener.onResponse(null); - }, e -> { - logger.warn(() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e); - listener.onResponse(null); - }); - } - - private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) { - try { - indexMetaDataFormat.delete(indexContainer(indexId), snapshotId.getUUID()); - } catch (IOException ex) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete metadata for index [{}]", - snapshotId, indexId.getName()), ex); - } } @Override