Fixing a few spots where NOOP tasks on the snapshot pool were created needlessly. Especially when it comes to mixed master+data nodes and concurrent snapshots these hurt delete operation performance needlessly.
This commit is contained in:
parent
b680d3fb29
commit
6710104673
|
@ -668,7 +668,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
||||
final ActionListener<Void> afterCleanupsListener =
|
||||
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(updatedRepoData)), 2);
|
||||
asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
|
||||
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, updatedRepoData, afterCleanupsListener);
|
||||
asyncCleanupUnlinkedShardLevelBlobs(repositoryData, snapshotIds, writeShardMetaDataAndComputeDeletesStep.result(),
|
||||
afterCleanupsListener);
|
||||
}, listener::onFailure);
|
||||
|
@ -679,7 +679,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
// Run unreferenced blobs cleanup in parallel to shard-level snapshot deletion
|
||||
final ActionListener<Void> afterCleanupsListener =
|
||||
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(newRepoData)), 2);
|
||||
asyncCleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, newRepoData, afterCleanupsListener);
|
||||
cleanupUnlinkedRootAndIndicesBlobs(snapshotIds, foundIndices, rootBlobs, newRepoData, afterCleanupsListener);
|
||||
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> writeMetaAndComputeDeletesStep = new StepListener<>();
|
||||
writeUpdatedShardMetaDataAndComputeDeletes(snapshotIds, repositoryData, false, writeMetaAndComputeDeletesStep);
|
||||
writeMetaAndComputeDeletesStep.whenComplete(deleteResults ->
|
||||
|
@ -689,22 +689,25 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
private void asyncCleanupUnlinkedRootAndIndicesBlobs(Collection<SnapshotId> deletedSnapshots, Map<String, BlobContainer> foundIndices,
|
||||
Map<String, BlobMetadata> rootBlobs, RepositoryData updatedRepoData,
|
||||
ActionListener<Void> listener) {
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
|
||||
listener,
|
||||
l -> cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(l, ignored -> null))));
|
||||
private void cleanupUnlinkedRootAndIndicesBlobs(Collection<SnapshotId> deletedSnapshots, Map<String, BlobContainer> foundIndices,
|
||||
Map<String, BlobMetadata> rootBlobs, RepositoryData updatedRepoData,
|
||||
ActionListener<Void> listener) {
|
||||
cleanupStaleBlobs(deletedSnapshots, foundIndices, rootBlobs, updatedRepoData, ActionListener.map(listener, ignored -> null));
|
||||
}
|
||||
|
||||
private void asyncCleanupUnlinkedShardLevelBlobs(RepositoryData oldRepositoryData, Collection<SnapshotId> snapshotIds,
|
||||
Collection<ShardSnapshotMetaDeleteResult> deleteResults,
|
||||
ActionListener<Void> listener) {
|
||||
final List<String> filesToDelete = resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults);
|
||||
if (filesToDelete.isEmpty()) {
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(
|
||||
listener,
|
||||
l -> {
|
||||
try {
|
||||
deleteFromContainer(blobContainer(), resolveFilesToDelete(oldRepositoryData, snapshotIds, deleteResults));
|
||||
deleteFromContainer(blobContainer(), filesToDelete);
|
||||
l.onResponse(null);
|
||||
} catch (Exception e) {
|
||||
logger.warn(
|
||||
|
@ -848,13 +851,23 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}, listener::onFailure), 2);
|
||||
|
||||
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
||||
executor.execute(ActionRunnable.supply(groupedListener, () -> {
|
||||
List<String> deletedBlobs = cleanupStaleRootFiles(deletedSnapshots, staleRootBlobs(newRepoData, rootBlobs.keySet()));
|
||||
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
|
||||
}));
|
||||
final List<String> staleRootBlobs = staleRootBlobs(newRepoData, rootBlobs.keySet());
|
||||
if (staleRootBlobs.isEmpty()) {
|
||||
groupedListener.onResponse(DeleteResult.ZERO);
|
||||
} else {
|
||||
executor.execute(ActionRunnable.supply(groupedListener, () -> {
|
||||
List<String> deletedBlobs =
|
||||
cleanupStaleRootFiles(newRepoData.getGenId() - 1, deletedSnapshots, staleRootBlobs);
|
||||
return new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum());
|
||||
}));
|
||||
}
|
||||
|
||||
final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
|
||||
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
|
||||
if (foundIndices.keySet().equals(survivingIndexIds)) {
|
||||
groupedListener.onResponse(DeleteResult.ZERO);
|
||||
} else {
|
||||
executor.execute(ActionRunnable.supply(groupedListener, () -> cleanupStaleIndices(foundIndices, survivingIndexIds)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -896,7 +909,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
|
||||
// Finds all blobs directly under the repository root path that are not referenced by the current RepositoryData
|
||||
private List<String> staleRootBlobs(RepositoryData repositoryData, Set<String> rootBlobNames) {
|
||||
private static List<String> staleRootBlobs(RepositoryData repositoryData, Set<String> rootBlobNames) {
|
||||
final Set<String> allSnapshotIds =
|
||||
repositoryData.getSnapshotIds().stream().map(SnapshotId::getUUID).collect(Collectors.toSet());
|
||||
return rootBlobNames.stream().filter(
|
||||
|
@ -925,7 +938,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
).collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private List<String> cleanupStaleRootFiles(Collection<SnapshotId> deletedSnapshots, List<String> blobsToDelete) {
|
||||
private List<String> cleanupStaleRootFiles(long previousGeneration, Collection<SnapshotId> deletedSnapshots,
|
||||
List<String> blobsToDelete) {
|
||||
if (blobsToDelete.isEmpty()) {
|
||||
return blobsToDelete;
|
||||
}
|
||||
|
@ -936,7 +950,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
// delete would also log a confusing INFO message about "stale blobs".
|
||||
final Set<String> blobNamesToIgnore = deletedSnapshots.stream().flatMap(
|
||||
snapshotId -> Stream.of(GLOBAL_METADATA_FORMAT.blobName(snapshotId.getUUID()),
|
||||
SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()))).collect(Collectors.toSet());
|
||||
SNAPSHOT_FORMAT.blobName(snapshotId.getUUID()), INDEX_FILE_PREFIX + previousGeneration))
|
||||
.collect(Collectors.toSet());
|
||||
final List<String> blobsToLog = blobsToDelete.stream().filter(b -> blobNamesToIgnore.contains(b) == false)
|
||||
.collect(Collectors.toList());
|
||||
if (blobsToLog.isEmpty() == false) {
|
||||
|
@ -1011,7 +1026,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
final boolean writeIndexGens = SnapshotsService.useIndexGenerations(repositoryMetaVersion);
|
||||
|
||||
final StepListener<RepositoryData> repoDataListener = new StepListener<>();
|
||||
executor.execute(ActionRunnable.wrap(repoDataListener, this::getRepositoryData));
|
||||
getRepositoryData(repoDataListener);
|
||||
repoDataListener.whenComplete(existingRepositoryData -> {
|
||||
|
||||
final Map<IndexId, String> indexMetas;
|
||||
|
|
Loading…
Reference in New Issue