Parallelize Repository Cleanup Actions (#46647) (#46714)

* Parallelize Repository Cleanup Actions

Deleting root blobs and unreferenced indices can safely happen in parallel,
no need to have both operations run sequentially when they preclude all other
repository operations.
This commit is contained in:
Armin Braun 2019-09-16 07:52:03 +02:00 committed by GitHub
parent 63eb0d9081
commit 2b85dcb201
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 49 additions and 19 deletions

View File

@ -111,6 +111,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
@ -425,10 +426,10 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
final RepositoryData updatedRepositoryData;
final Map<String, BlobContainer> foundIndices;
final Set<String> rootBlobs;
final Map<String, BlobMetaData> rootBlobs;
try {
rootBlobs = blobContainer().listBlobs().keySet();
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs));
rootBlobs = blobContainer().listBlobs();
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet()));
updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
// delete an index that was created by another master node after writing this index-N blob.
@ -455,16 +456,45 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
.map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList()))
.orElse(Collections.emptyList()),
snapshotId,
ActionListener.map(listener, v -> {
cleanupStaleIndices(foundIndices, survivingIndices.values().stream().map(IndexId::getId).collect(Collectors.toSet()));
cleanupStaleRootFiles(
staleRootBlobs(updatedRepositoryData, Sets.difference(rootBlobs, new HashSet<>(snapMetaFilesToDelete))));
return null;
})
);
ActionListener.delegateFailure(listener,
(l, v) -> cleanupStaleBlobs(foundIndices,
Sets.difference(rootBlobs.keySet(), new HashSet<>(snapMetaFilesToDelete)).stream().collect(
Collectors.toMap(Function.identity(), rootBlobs::get)),
updatedRepositoryData, ActionListener.map(l, ignored -> null))));
}
}
/**
* Cleans up stale blobs directly under the repository root as well as all indices paths that aren't referenced by any existing
* snapshots. This method is only to be called directly after a new {@link RepositoryData} was written to the repository and with
* parameters {@code foundIndices}, {@code rootBlobs}
*
* @param foundIndices all indices blob containers found in the repository before {@code newRepoData} was written
* @param rootBlobs all blobs found directly under the repository root
* @param newRepoData new repository data that was just written
* @param listener listener to invoke with the combined {@link DeleteResult} of all blobs removed in this operation
*/
private void cleanupStaleBlobs(Map<String, BlobContainer> foundIndices, Map<String, BlobMetaData> rootBlobs,
RepositoryData newRepoData, ActionListener<DeleteResult> listener) {
final GroupedActionListener<DeleteResult> groupedListener = new GroupedActionListener<>(ActionListener.wrap(deleteResults -> {
DeleteResult deleteResult = DeleteResult.ZERO;
for (DeleteResult result : deleteResults) {
deleteResult = deleteResult.add(result);
}
listener.onResponse(deleteResult);
}, listener::onFailure), 2);
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
executor.execute(ActionRunnable.wrap(groupedListener, l -> {
List<String> deletedBlobs = cleanupStaleRootFiles(staleRootBlobs(newRepoData, rootBlobs.keySet()));
l.onResponse(
new DeleteResult(deletedBlobs.size(), deletedBlobs.stream().mapToLong(name -> rootBlobs.get(name).length()).sum()));
}));
final Set<String> survivingIndexIds = newRepoData.getIndices().values().stream().map(IndexId::getId).collect(Collectors.toSet());
executor.execute(ActionRunnable.wrap(groupedListener, l -> l.onResponse(cleanupStaleIndices(foundIndices, survivingIndexIds))));
}
/**
* Runs cleanup actions on the repository. Increments the repository state id by one before executing any modifications on the
* repository.
@ -477,7 +507,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
* @param listener Lister to complete when done
*/
public void cleanup(long repositoryStateId, ActionListener<RepositoryCleanupResult> listener) {
ActionListener.completeWith(listener, () -> {
try {
if (isReadOnly()) {
throw new RepositoryException(metadata.name(), "cannot run cleanup on readonly repository");
}
@ -495,15 +525,15 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
final List<String> staleRootBlobs = staleRootBlobs(repositoryData, rootBlobs.keySet());
if (survivingIndexIds.equals(foundIndices.keySet()) && staleRootBlobs.isEmpty()) {
// Nothing to clean up we return
return new RepositoryCleanupResult(DeleteResult.ZERO);
listener.onResponse(new RepositoryCleanupResult(DeleteResult.ZERO));
} else {
// write new index-N blob to ensure concurrent operations will fail
writeIndexGen(repositoryData, repositoryStateId);
cleanupStaleBlobs(foundIndices, rootBlobs, repositoryData, ActionListener.map(listener, RepositoryCleanupResult::new));
}
// write new index-N blob to ensure concurrent operations will fail
writeIndexGen(repositoryData, repositoryStateId);
final DeleteResult deleteIndicesResult = cleanupStaleIndices(foundIndices, survivingIndexIds);
List<String> cleaned = cleanupStaleRootFiles(staleRootBlobs);
return new RepositoryCleanupResult(
deleteIndicesResult.add(cleaned.size(), cleaned.stream().mapToLong(name -> rootBlobs.get(name).length()).sum()));
});
} catch (Exception e) {
listener.onFailure(e);
}
}
// Finds all blobs directly under the repository root path that are not referenced by the current RepositoryData