Speed up and Reorder Snapshot Delete Operations (#47293) (#47350)

This is a preliminary of #46250 making the snapshot
delete work by doing all the metadata updates first
and then bulk deleting all of the now unreferenced
blobs.
Before this change, the metadata updates for each shard
and subsequent deletion of the blobs that have become unreferenced
due to the delete would happen sequentially shard-by-shard
parallelising only over all the indices in the snapshot.
This change makes it so the all the metadata updates
happen in parallel on a shard level first.
Once all of the updates of shard-level metadata have finished,
all the now unreferenced blobs are deleted in bulk.
This has two benefits (outside of making #46250 a smaller change):
* We have a lower likelihood of failing to update shard level metadata because
it happens with priority and a higher degree of parallelism
* Deleting of unreferenced data in the shards should go much faster in many cases (rolling indices, large number of indices with many unchanged shards) as well because a number of small bulk deletions (just two blobs for `index-N` and `snap-` for each unchanged shard) are grouped into larger bulk deletes of `100-1000` blobs depending on Cloud provider (even though the final bulk deletes are happening sequentially this should be much faster in almost all cases as you'd parallelism of 50 (GCS) to 500 (S3) snapshot threads to achieve the same delete rates when deleting from unchanged shards).
This commit is contained in:
Armin Braun 2019-10-01 19:05:43 +02:00 committed by GitHub
parent e70220857d
commit 3d6ef6a90e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 87 additions and 24 deletions

View File

@ -60,12 +60,14 @@ import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.snapshots.IndexShardRestoreFailedException;
@ -623,12 +625,17 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
listener.onResponse(null);
return;
}
final ActionListener<Void> groupedListener = new GroupedActionListener<>(ActionListener.map(listener, v -> null), indices.size());
for (IndexId indexId: indices) {
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(new ActionRunnable<Void>(groupedListener) {
// listener to complete once all shards folders affected by this delete have been added new metadata blobs without this snapshot
final StepListener<Collection<ShardSnapshotMetaDeleteResult>> deleteFromMetaListener = new StepListener<>();
@Override
protected void doRun() {
// Listener that flattens out the delete results for each index
final ActionListener<Collection<ShardSnapshotMetaDeleteResult>> deleteIndexMetaDataListener = new GroupedActionListener<>(
ActionListener.map(deleteFromMetaListener,
results -> results.stream().flatMap(Collection::stream).collect(Collectors.toList())), indices.size());
final Executor executor = threadPool.executor(ThreadPool.Names.SNAPSHOT);
for (IndexId indexId : indices) {
executor.execute(ActionRunnable.wrap(deleteIndexMetaDataListener,
deleteIdxMetaListener -> {
IndexMetaData indexMetaData = null;
try {
indexMetaData = getSnapshotIndexMetaData(snapshotId, indexId);
@ -638,20 +645,56 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
deleteIndexMetaDataBlobIgnoringErrors(snapshotId, indexId);
if (indexMetaData != null) {
final int shardCount = indexMetaData.getNumberOfShards();
assert shardCount > 0 : "index did not have positive shard count, get [" + shardCount + "]";
// Listener for collecting the results of removing the snapshot from each shard's metadata in the current index
final ActionListener<ShardSnapshotMetaDeleteResult> allShardsListener =
new GroupedActionListener<>(deleteIdxMetaListener, shardCount);
final Index index = indexMetaData.getIndex();
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
try {
deleteShardSnapshot(repositoryData, indexId, new ShardId(indexMetaData.getIndex(), shardId), snapshotId);
} catch (Exception ex) {
final int finalShardId = shardId;
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
snapshotId, indexId.getName(), finalShardId), ex);
}
final ShardId shard = new ShardId(index, shardId);
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
allShardsListener.onResponse(
deleteShardSnapshot(repositoryData, indexId, shard, snapshotId));
}
@Override
public void onFailure(Exception ex) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
snapshotId, indexId.getName(), shard.id()), ex);
// Just passing null here to count down the listener instead of failing it, the stale data left behind
// here will be retried in the next delete or repository cleanup
allShardsListener.onResponse(null);
}
});
}
} else {
// Just invoke the listener without any shard generations to count it down, this index will be cleaned up
// by the stale data cleanup in the end.
deleteIdxMetaListener.onResponse(null);
}
groupedListener.onResponse(null);
}
});
}));
}
// Delete all the now unreferenced blobs in the shard paths
deleteFromMetaListener.whenComplete(newGens -> {
final String basePath = basePath().buildAsString();
final int basePathLen = basePath.length();
blobContainer().deleteBlobsIgnoringIfNotExists(
newGens.stream().flatMap(shardBlob -> {
final String shardPathAbs = shardContainer(shardBlob.indexId, shardBlob.shardId).path().buildAsString();
assert shardPathAbs.startsWith(basePath);
final String pathToShard = shardPathAbs.substring(basePathLen);
return shardBlob.blobsToDelete.stream().map(blob -> pathToShard + blob);
}).collect(Collectors.toList())
);
listener.onResponse(null);
}, e -> {
logger.warn(() -> new ParameterizedMessage("[{}] Failed to delete some blobs during snapshot delete", snapshotId), e);
listener.onResponse(null);
});
}
private void deleteIndexMetaDataBlobIgnoringErrors(SnapshotId snapshotId, IndexId indexId) {
@ -756,7 +799,11 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
private BlobContainer shardContainer(IndexId indexId, ShardId shardId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId.getId())));
return shardContainer(indexId, shardId.getId());
}
private BlobContainer shardContainer(IndexId indexId, int shardId) {
return blobStore().blobContainer(indicesPath().add(indexId.getId()).add(Integer.toString(shardId)));
}
/**
@ -1239,8 +1286,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
/**
* Delete shard snapshot
*/
private void deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId, SnapshotId snapshotId)
throws IOException {
private ShardSnapshotMetaDeleteResult deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId,
SnapshotId snapshotId) throws IOException {
final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId);
final Map<String, BlobMetaData> blobs;
try {
@ -1275,12 +1322,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
blobsToDelete = unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots);
}
try {
shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
} catch (IOException e) {
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization",
snapshotId, snapshotShardId), e);
}
return new ShardSnapshotMetaDeleteResult(indexId, snapshotShardId.id(), blobsToDelete);
} catch (IOException e) {
throw new IndexShardSnapshotFailedException(snapshotShardId,
"Failed to finalize snapshot deletion [" + snapshotId + "] with shard index ["
@ -1397,4 +1439,25 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
}
}
}
/**
* The result of removing a snapshot from a shard folder in the repository.
*/
private static final class ShardSnapshotMetaDeleteResult {
// Index that the snapshot was removed from
private final IndexId indexId;
// Shard id that the snapshot was removed from
private final int shardId;
// Blob names in the shard directory that have become unreferenced in the new shard generation
private final Collection<String> blobsToDelete;
ShardSnapshotMetaDeleteResult(IndexId indexId, int shardId, Collection<String> blobsToDelete) {
this.indexId = indexId;
this.shardId = shardId;
this.blobsToDelete = blobsToDelete;
}
}
}