In #46250 we will have to move to two different delete paths for BwC. Both paths will share the initial reads from the repository though so I extracted/separated the delete logic away from the initial reads to significantly simplify the diff against #46250 here. Also, I added some JavaDoc from #46250 here as well which makes the code a little easier to follow even ignoring #46250 I think.
This commit is contained in:
parent
a6c9f8ffae
commit
a087553009
|
@ -403,56 +403,66 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
if (isReadOnly()) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "cannot delete snapshot from a readonly repository"));
|
||||
} else {
|
||||
SnapshotInfo snapshot = null;
|
||||
try {
|
||||
snapshot = getSnapshotInfo(snapshotId);
|
||||
} catch (SnapshotMissingException ex) {
|
||||
listener.onFailure(ex);
|
||||
return;
|
||||
} catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) {
|
||||
logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex);
|
||||
}
|
||||
// Delete snapshot from the index file, since it is the maintainer of truth of active snapshots
|
||||
final RepositoryData updatedRepositoryData;
|
||||
final Map<String, BlobContainer> foundIndices;
|
||||
final Map<String, BlobMetaData> rootBlobs;
|
||||
try {
|
||||
rootBlobs = blobContainer().listBlobs();
|
||||
final Map<String, BlobMetaData> rootBlobs = blobContainer().listBlobs();
|
||||
final RepositoryData repositoryData = getRepositoryData(latestGeneration(rootBlobs.keySet()));
|
||||
updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
|
||||
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
|
||||
// delete an index that was created by another master node after writing this index-N blob.
|
||||
|
||||
foundIndices = blobStore().blobContainer(indicesPath()).children();
|
||||
writeIndexGen(updatedRepositoryData, repositoryStateId);
|
||||
final Map<String, BlobContainer> foundIndices = blobStore().blobContainer(indicesPath()).children();
|
||||
doDeleteShardSnapshots(snapshotId, repositoryStateId, foundIndices, rootBlobs, repositoryData, listener);
|
||||
} catch (Exception ex) {
|
||||
listener.onFailure(new RepositoryException(metadata.name(), "failed to delete snapshot [" + snapshotId + "]", ex));
|
||||
return;
|
||||
}
|
||||
final SnapshotInfo finalSnapshotInfo = snapshot;
|
||||
final List<String> snapMetaFilesToDelete =
|
||||
Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID()));
|
||||
try {
|
||||
blobContainer().deleteBlobsIgnoringIfNotExists(snapMetaFilesToDelete);
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e);
|
||||
}
|
||||
final Map<String, IndexId> survivingIndices = updatedRepositoryData.getIndices();
|
||||
deleteIndices(
|
||||
updatedRepositoryData,
|
||||
Optional.ofNullable(finalSnapshotInfo)
|
||||
.map(info -> info.indices().stream().filter(survivingIndices::containsKey)
|
||||
.map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList()))
|
||||
.orElse(Collections.emptyList()),
|
||||
snapshotId,
|
||||
ActionListener.delegateFailure(listener,
|
||||
(l, v) -> cleanupStaleBlobs(foundIndices,
|
||||
Sets.difference(rootBlobs.keySet(), new HashSet<>(snapMetaFilesToDelete)).stream().collect(
|
||||
Collectors.toMap(Function.identity(), rootBlobs::get)),
|
||||
updatedRepositoryData, ActionListener.map(l, ignored -> null))));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* After updating the {@link RepositoryData} each of the shards directories is individually first moved to the next shard generation
|
||||
* and then has all now unreferenced blobs in it deleted.
|
||||
*
|
||||
* @param snapshotId SnapshotId to delete
|
||||
* @param repositoryStateId Expected repository state id
|
||||
* @param foundIndices All indices folders found in the repository before executing any writes to the repository during this
|
||||
* delete operation
|
||||
* @param rootBlobs All blobs found at the root of the repository before executing any writes to the repository during this
|
||||
* delete operation
|
||||
* @param repositoryData RepositoryData found the in the repository before executing this delete
|
||||
* @param listener Listener to invoke once finished
|
||||
*/
|
||||
private void doDeleteShardSnapshots(SnapshotId snapshotId, long repositoryStateId, Map<String, BlobContainer> foundIndices,
|
||||
Map<String, BlobMetaData> rootBlobs, RepositoryData repositoryData,
|
||||
ActionListener<Void> listener) throws IOException {
|
||||
final RepositoryData updatedRepositoryData = repositoryData.removeSnapshot(snapshotId);
|
||||
// Cache the indices that were found before writing out the new index-N blob so that a stuck master will never
|
||||
// delete an index that was created by another master node after writing this index-N blob.
|
||||
writeIndexGen(updatedRepositoryData, repositoryStateId);
|
||||
SnapshotInfo snapshot = null;
|
||||
try {
|
||||
snapshot = getSnapshotInfo(snapshotId);
|
||||
} catch (SnapshotMissingException ex) {
|
||||
listener.onFailure(ex);
|
||||
return;
|
||||
} catch (IllegalStateException | SnapshotException | ElasticsearchParseException ex) {
|
||||
logger.warn(() -> new ParameterizedMessage("cannot read snapshot file [{}]", snapshotId), ex);
|
||||
}
|
||||
final List<String> snapMetaFilesToDelete =
|
||||
Arrays.asList(snapshotFormat.blobName(snapshotId.getUUID()), globalMetaDataFormat.blobName(snapshotId.getUUID()));
|
||||
try {
|
||||
blobContainer().deleteBlobsIgnoringIfNotExists(snapMetaFilesToDelete);
|
||||
} catch (IOException e) {
|
||||
logger.warn(() -> new ParameterizedMessage("[{}] Unable to delete global metadata files", snapshotId), e);
|
||||
}
|
||||
final Map<String, IndexId> survivingIndices = updatedRepositoryData.getIndices();
|
||||
deleteIndices(
|
||||
updatedRepositoryData,
|
||||
Optional.ofNullable(snapshot).map(info -> info.indices().stream().filter(survivingIndices::containsKey)
|
||||
.map(updatedRepositoryData::resolveIndexId).collect(Collectors.toList())).orElse(Collections.emptyList()),
|
||||
snapshotId,
|
||||
ActionListener.delegateFailure(listener,
|
||||
(l, v) -> cleanupStaleBlobs(foundIndices,
|
||||
Sets.difference(rootBlobs.keySet(), new HashSet<>(snapMetaFilesToDelete)).stream().collect(
|
||||
Collectors.toMap(Function.identity(), rootBlobs::get)),
|
||||
updatedRepositoryData, ActionListener.map(l, ignored -> null))));
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleans up stale blobs directly under the repository root as well as all indices paths that aren't referenced by any existing
|
||||
* snapshots. This method is only to be called directly after a new {@link RepositoryData} was written to the repository and with
|
||||
|
@ -601,6 +611,13 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
return deleteResult;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param repositoryData RepositoryData with the snapshot removed
|
||||
* @param indices Indices to remove the snapshot from (should not contain indices that become completely unreferenced with the
|
||||
* removal of this snapshot as those are cleaned up afterwards by {@link #cleanupStaleBlobs})
|
||||
* @param snapshotId SnapshotId to remove from all the given indices
|
||||
* @param listener Listener to invoke when finished
|
||||
*/
|
||||
private void deleteIndices(RepositoryData repositoryData, List<IndexId> indices, SnapshotId snapshotId,
|
||||
ActionListener<Void> listener) {
|
||||
if (indices.isEmpty()) {
|
||||
|
|
Loading…
Reference in New Issue