* Stronger Cleanup Shard Snapshot Directory on Delete * Use `RepositoryData` to clean up unreferenced `snap-${uuid}.dat` blobs from shard directories (and index-N) and as a result also clean up data blobs that are only referenced by them * Stop cleaning up anything but index-N on shard snapshot creation to align behavior of shard index-N handling with root path index-N handling
This commit is contained in:
parent
22dc125dad
commit
eb1106c465
|
@ -444,6 +444,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
final Map<String, IndexId> survivingIndices = updatedRepositoryData.getIndices();
|
||||
deleteIndices(
|
||||
updatedRepositoryData,
|
||||
Optional.ofNullable(finalSnapshotInfo)
|
||||
.map(info -> info.indices().stream().filter(survivingIndices::containsKey)
|
||||
.map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList()))
|
||||
|
@ -528,7 +529,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
private void deleteIndices(List<IndexId> indices, SnapshotId snapshotId, ActionListener<Void> listener) {
|
||||
private void deleteIndices(RepositoryData repositoryData, List<IndexId> indices, SnapshotId snapshotId,
|
||||
ActionListener<Void> listener) {
|
||||
if (indices.isEmpty()) {
|
||||
listener.onResponse(null);
|
||||
return;
|
||||
|
@ -550,7 +552,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
if (indexMetaData != null) {
|
||||
for (int shardId = 0; shardId < indexMetaData.getNumberOfShards(); shardId++) {
|
||||
try {
|
||||
deleteShardSnapshot(indexId, new ShardId(indexMetaData.getIndex(), shardId), snapshotId);
|
||||
deleteShardSnapshot(repositoryData, indexId, new ShardId(indexMetaData.getIndex(), shardId), snapshotId);
|
||||
} catch (SnapshotException ex) {
|
||||
final int finalShardId = shardId;
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete shard data for shard [{}][{}]",
|
||||
|
@ -977,9 +979,29 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
for (SnapshotFiles point : snapshots) {
|
||||
newSnapshotsList.add(point);
|
||||
}
|
||||
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
|
||||
finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot creation [" + snapshotId + "]", shardContainer,
|
||||
shardId, snapshotId);
|
||||
final String indexGeneration = Long.toString(fileListGeneration + 1);
|
||||
final List<String> blobsToDelete;
|
||||
try {
|
||||
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
|
||||
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
|
||||
// Delete all previous index-N blobs
|
||||
blobsToDelete =
|
||||
blobs.keySet().stream().filter(blob -> blob.startsWith(SNAPSHOT_INDEX_PREFIX)).collect(Collectors.toList());
|
||||
assert blobsToDelete.stream().mapToLong(b -> Long.parseLong(b.replaceFirst(SNAPSHOT_INDEX_PREFIX, "")))
|
||||
.max().orElse(-1L) < Long.parseLong(indexGeneration)
|
||||
: "Tried to delete an index-N blob newer than the current generation [" + indexGeneration + "] when deleting index-N" +
|
||||
" blobs " + blobsToDelete;
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardSnapshotFailedException(shardId,
|
||||
"Failed to finalize snapshot creation [" + snapshotId + "] with shard index ["
|
||||
+ indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e);
|
||||
}
|
||||
try {
|
||||
shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete old index-N blobs during finalization",
|
||||
snapshotId, shardId), e);
|
||||
}
|
||||
snapshotStatus.moveToDone(threadPool.absoluteTimeInMillis());
|
||||
} catch (Exception e) {
|
||||
snapshotStatus.moveToFailed(threadPool.absoluteTimeInMillis(), ExceptionsHelper.detailedMessage(e));
|
||||
|
@ -1067,7 +1089,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
/**
|
||||
* Delete shard snapshot
|
||||
*/
|
||||
private void deleteShardSnapshot(IndexId indexId, ShardId snapshotShardId, SnapshotId snapshotId) {
|
||||
private void deleteShardSnapshot(RepositoryData repositoryData, IndexId indexId, ShardId snapshotShardId, SnapshotId snapshotId) {
|
||||
final BlobContainer shardContainer = shardContainer(indexId, snapshotShardId);
|
||||
final Map<String, BlobMetaData> blobs;
|
||||
try {
|
||||
|
@ -1082,19 +1104,43 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
|
||||
// Build a list of snapshots that should be preserved
|
||||
List<SnapshotFiles> newSnapshotsList = new ArrayList<>();
|
||||
final Set<String> survivingSnapshotNames =
|
||||
repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getName).collect(Collectors.toSet());
|
||||
for (SnapshotFiles point : snapshots) {
|
||||
if (!point.snapshot().equals(snapshotId.getName())) {
|
||||
if (survivingSnapshotNames.contains(point.snapshot())) {
|
||||
newSnapshotsList.add(point);
|
||||
}
|
||||
}
|
||||
// finalize the snapshot and rewrite the snapshot index with the next sequential snapshot index
|
||||
finalizeShard(newSnapshotsList, fileListGeneration, blobs, "snapshot deletion [" + snapshotId + "]", shardContainer,
|
||||
snapshotShardId, snapshotId);
|
||||
|
||||
final String indexGeneration = Long.toString(fileListGeneration + 1);
|
||||
try {
|
||||
shardContainer.deleteBlobIgnoringIfNotExists(indexShardSnapshotFormat.blobName(snapshotId.getUUID()));
|
||||
final List<String> blobsToDelete;
|
||||
if (newSnapshotsList.isEmpty()) {
|
||||
// If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found
|
||||
blobsToDelete = new ArrayList<>(blobs.keySet());
|
||||
} else {
|
||||
final Set<String> survivingSnapshotUUIDs = repositoryData.getSnapshots(indexId).stream().map(SnapshotId::getUUID)
|
||||
.collect(Collectors.toSet());
|
||||
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
|
||||
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
|
||||
// Delete all previous index-N, data- and meta-blobs and that are not referenced by the new index-N and temporary blobs
|
||||
blobsToDelete = blobs.keySet().stream().filter(blob ->
|
||||
blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|
||||
|| (blob.startsWith(SNAPSHOT_PREFIX) && blob.endsWith(".dat")
|
||||
&& survivingSnapshotUUIDs.contains(
|
||||
blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())) == false)
|
||||
|| (blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|
||||
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
|
||||
}
|
||||
try {
|
||||
shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
|
||||
} catch (IOException e) {
|
||||
logger.warn(new ParameterizedMessage("[{}] [{}] failed to delete shard snapshot file", snapshotShardId, snapshotId), e);
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization",
|
||||
snapshotId, snapshotShardId), e);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new IndexShardSnapshotFailedException(snapshotShardId,
|
||||
"Failed to finalize snapshot deletion [" + snapshotId + "] with shard index ["
|
||||
+ indexShardSnapshotsFormat.blobName(indexGeneration) + "]", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1110,47 +1156,6 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Writes a new index file for the shard and removes all unreferenced files from the repository.
|
||||
*
|
||||
* We need to be really careful in handling index files in case of failures to make sure we don't
|
||||
* have index file that points to files that were deleted.
|
||||
*
|
||||
* @param snapshots list of active snapshots in the container
|
||||
* @param fileListGeneration the generation number of the current snapshot index file
|
||||
* @param blobs list of blobs in the container
|
||||
* @param reason a reason explaining why the shard index file is written
|
||||
*/
|
||||
private void finalizeShard(List<SnapshotFiles> snapshots, long fileListGeneration, Map<String, BlobMetaData> blobs,
|
||||
String reason, BlobContainer shardContainer, ShardId shardId, SnapshotId snapshotId) {
|
||||
final String indexGeneration = Long.toString(fileListGeneration + 1);
|
||||
try {
|
||||
final List<String> blobsToDelete;
|
||||
if (snapshots.isEmpty()) {
|
||||
// If we deleted all snapshots, we don't need to create a new index file and simply delete all the blobs we found
|
||||
blobsToDelete = new ArrayList<>(blobs.keySet());
|
||||
} else {
|
||||
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(snapshots);
|
||||
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
|
||||
// Delete all previous index-N, data-blobs that are not referenced by the new index-N and temporary blobs
|
||||
blobsToDelete = blobs.keySet().stream().filter(blob ->
|
||||
blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|
||||
|| blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null
|
||||
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
|
||||
}
|
||||
try {
|
||||
shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}][{}] failed to delete blobs during finalization",
|
||||
snapshotId, shardId), e);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
String message =
|
||||
"Failed to finalize " + reason + " with shard index [" + indexShardSnapshotsFormat.blobName(indexGeneration) + "]";
|
||||
throw new IndexShardSnapshotFailedException(shardId, message, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads all available snapshots in the repository
|
||||
*
|
||||
|
|
Loading…
Reference in New Issue