This change removes the special path for deleting the index metadata blobs and moves deleting them to the bulk delete of unreferenced blobs at the end of the snapshot delete process. This saves N RPC calls for a snapshot containing N indices and simplifies the code. Also, this change moves the unreferenced data cleanup up the stack to make it more obvious that any exceptions during this pahse will be ignored and not fail the delete request. Lastly, this change removes the needless chaining of first deleting unreferenced data from the snapshot delete and then running the stale data cleanup (that would also run from the cleanup endpoint) and simply fires off the cleanup right after updating the repository data (index-N) in parallel to the other delete operations to speed up the delete some more.
This commit is contained in:
parent
b5ac0204d2
commit
b669b8f046
|
@ -107,6 +107,7 @@ import java.util.Set;
|
|||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static org.elasticsearch.index.snapshots.blobstore.BlobStoreIndexShardSnapshot.FileInfo.canonicalName;
|
||||
|
||||
|
@ -429,12 +430,43 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
ActionListener<Void> listener) throws IOException {
|
||||
final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
|
||||
writeIndexGen(updatedRepositoryData, repositoryStateId);
|
||||
final ActionListener<Void> afterCleanupsListener =
|
||||
new GroupedActionListener<>(ActionListener.wrap(() -> listener.onResponse(null)), 2);
|
||||
|
||||
// Run unreferenced blobs cleanup in parallel to snapshot deletion
|
||||
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(ActionRunnable.wrap(afterCleanupsListener,
|
||||
l -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null))));
|
||||
|
||||
deleteIndices(
|
||||
updatedRepositoryData,
|
||||
repositoryData.indicesToUpdateAfterRemovingSnapshot(snapshotId),
|
||||
snapshotId,
|
||||
ActionListener.delegateFailure(listener,
|
||||
(l, v) -> cleanupStaleBlobs(foundIndices, rootBlobs, updatedRepositoryData, ActionListener.map(l, ignored -> null))));
|
||||
ActionListener.runAfter(
|
||||
ActionListener.wrap(
|
||||
deleteResults -> {
|
||||
// Now that all metadata (RepositoryData at the repo root as well as index-N blobs in all shard paths)
|
||||
// has been updated we can execute the delete operations for all blobs that have become unreferenced as a result
|
||||
final String basePath = basePath().buildAsString();
|
||||
final int basePathLen = basePath.length();
|
||||
blobContainer().deleteBlobsIgnoringIfNotExists(
|
||||
Stream.concat(
|
||||
deleteResults.stream().flatMap(shardResult -> {
|
||||
final String shardPath =
|
||||
shardContainer(shardResult.indexId, shardResult.shardId).path().buildAsString();
|
||||
return shardResult.blobsToDelete.stream().map(blob -> shardPath + blob);
|
||||
}),
|
||||
deleteResults.stream().map(shardResult -> shardResult.indexId).distinct().map(indexId ->
|
||||
indexContainer(indexId).path().buildAsString() + globalMetaDataFormat.blobName(snapshotId.getUUID()))
|
||||
).map(absolutePath -> {
|
||||
assert absolutePath.startsWith(basePath);
|
||||
return absolutePath.substring(basePathLen);
|
||||
}).collect(Collectors.toList()));
|
||||
},
|
||||
// Any exceptions after we have updated the root level RepositoryData are only logged but won't fail the delete request
|
||||
e -> logger.warn(
|
||||
() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e)),
|
||||
() -> afterCleanupsListener.onResponse(null))
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -593,90 +625,58 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
* @param listener Listener to invoke when finished
|
||||
*/
|
||||
private void deleteIndices(RepositoryData repositoryData, List<IndexId> indices, SnapshotId snapshotId,
|
||||
ActionListener<Void> listener) {
|
||||
ActionListener<Collection<ShardSnapshotMetaDeleteResult>> listener) {
|
||||
|
||||
if (indices.isEmpty()) {
|
||||
listener.onResponse(null);
|
||||
listener.onResponse(Collections.emptyList());
|
||||
return;
|
||||
}
|
||||
// listener to complete once all shards folders affected by this delete have been added new metadata blobs without this snapshot
|
||||
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> deleteFromMetaListener = new StepListener<>();
|
||||
|
||||
// Listener that flattens out the delete results for each index
|
||||
final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetaDataListener = new GroupedActionListener<>(
|
||||
ActionListener.map(deleteFromMetaListener,
|
||||
results -> results.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size());
|
||||
ActionListener.map(listener, res -> res.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size());
|
||||
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
|
||||
for (IndexId indexId : indices) {
|
||||
executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener,
|
||||
deleteIdxMetaListener -> {
|
||||
IndexMetaData indexMetaData = null;
|
||||
final IndexMetaData indexMetaData;
|
||||
try {
|
||||
indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
|
||||
} catch (Exception ex) {
|
||||
logger.warn(() ->
|
||||
new ParameterizedMessage("[{}] [{}] failed to read metadata for index", snapshotId, indexId.getName()), ex);
|
||||
}
|
||||
deleteIndexMetaDataBlobIgnoringErrors(snapshotId, indexId);
|
||||
if (indexMetaData != null) {
|
||||
final int shardCount = indexMetaData.getNumberOfShards();
|
||||
assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
|
||||
// Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
|
||||
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener =
|
||||
new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
|
||||
final Index index = indexMetaData.getIndex();
|
||||
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
|
||||
final ShardId shard = new ShardId(index, shardId);
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
allShardsListener.onResponse(
|
||||
deleteShardSnapshot(repositoryData, indexId, shard, snapshotId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception ex) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
|
||||
snapshotId, indexId.getName(), shard.id()), ex);
|
||||
// Just passing null here to count down the listener instead of failing it, the stale data left behind
|
||||
// here will be retried in the next delete or repository cleanup
|
||||
allShardsListener.onResponse(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
// Just invoke the listener without any shard generations to count it down, this index will be cleaned up
|
||||
// by the stale data cleanup in the end.
|
||||
deleteIdxMetaListener.onResponse(null);
|
||||
return;
|
||||
}
|
||||
final int shardCount = indexMetaData.getNumberOfShards();
|
||||
assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
|
||||
// Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
|
||||
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener =
|
||||
new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
|
||||
final Index index = indexMetaData.getIndex();
|
||||
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
|
||||
final ShardId shard = new ShardId(index, shardId);
|
||||
executor.execute(new AbstractRunnable() {
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
allShardsListener.onResponse(
|
||||
deleteShardSnapshot(repositoryData, indexId, shard, snapshotId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception ex) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
|
||||
snapshotId, indexId.getName(), shard.id()), ex);
|
||||
// Just passing null here to count down the listener instead of failing it, the stale data left behind
|
||||
// here will be retried in the next delete or repository cleanup
|
||||
allShardsListener.onResponse(null);
|
||||
}
|
||||
});
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
// Delete all the now unreferenced blobs in the shard paths
|
||||
deleteFromMetaListener.whenComplete(newGens -> {
|
||||
final String basePath = basePath().buildAsString();
|
||||
final int basePathLen = basePath.length();
|
||||
blobContainer().deleteBlobsIgnoringIfNotExists(
|
||||
newGens.stream().flatMap(shardBlob -> {
|
||||
final String shardPathAbs = shardContainer(shardBlob.indexId, shardBlob.shardId).path().buildAsString();
|
||||
assert shardPathAbs.startsWith(basePath);
|
||||
final String pathToShard = shardPathAbs.substring(basePathLen);
|
||||
return shardBlob.blobsToDelete.stream().map(blob -> pathToShard + blob);
|
||||
}).collect(Collectors.toList())
|
||||
);
|
||||
listener.onResponse(null);
|
||||
}, e -> {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e);
|
||||
listener.onResponse(null);
|
||||
});
|
||||
}
|
||||
|
||||
private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) {
|
||||
try {
|
||||
indexMetaDataFormat.delete(indexContainer(indexId), snapshotId.getUUID());
|
||||
} catch (IOException ex) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete metadata for index [{}]",
|
||||
snapshotId, indexId.getName()), ex);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue