Inlining one trivial single-use method and extracting the stale shard path blob calculation to make the diff with #46250 more manageable.
This commit is contained in:
parent
e49be611ad
commit
c045bc7f54
|
@ -1240,14 +1240,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
.collect(Collectors.toSet());
|
.collect(Collectors.toSet());
|
||||||
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
|
final BlobStoreIndexShardSnapshots updatedSnapshots = new BlobStoreIndexShardSnapshots(newSnapshotsList);
|
||||||
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
|
indexShardSnapshotsFormat.writeAtomic(updatedSnapshots, shardContainer, indexGeneration);
|
||||||
// Delete all previous index-N, data- and meta-blobs and that are not referenced by the new index-N and temporary blobs
|
blobsToDelete = unusedBlobs(blobs, survivingSnapshotUUIDs, updatedSnapshots);
|
||||||
blobsToDelete = blobs.keySet().stream().filter(blob ->
|
|
||||||
blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|
|
||||||
|| (blob.startsWith(SNAPSHOT_PREFIX) && blob.endsWith(".dat")
|
|
||||||
&& survivingSnapshotUUIDs.contains(
|
|
||||||
blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())) == false)
|
|
||||||
|| (blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|
|
||||||
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
|
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
|
shardContainer.deleteBlobsIgnoringIfNotExists(blobsToDelete);
|
||||||
|
@ -1262,6 +1255,20 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Unused blobs are all previous index-, data- and meta-blobs and that are not referenced by the new index- as well as all
|
||||||
|
// temporary blobs
|
||||||
|
private static List<String> unusedBlobs(Map<String, BlobMetaData> blobs, Set<String> survivingSnapshotUUIDs,
|
||||||
|
BlobStoreIndexShardSnapshots updatedSnapshots) {
|
||||||
|
return blobs.keySet().stream().filter(blob ->
|
||||||
|
blob.startsWith(SNAPSHOT_INDEX_PREFIX)
|
||||||
|
|| (blob.startsWith(SNAPSHOT_PREFIX) && blob.endsWith(".dat")
|
||||||
|
&& survivingSnapshotUUIDs.contains(
|
||||||
|
blob.substring(SNAPSHOT_PREFIX.length(), blob.length() - ".dat".length())) == false)
|
||||||
|
|| (blob.startsWith(DATA_BLOB_PREFIX) && updatedSnapshots.findNameFile(canonicalName(blob)) == null)
|
||||||
|
|| FsBlobContainer.isTempBlobName(blob)).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Loads information about shard snapshot
|
* Loads information about shard snapshot
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -153,7 +153,10 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
if ((previousSnapshots == null && currentSnapshots != null)
|
if ((previousSnapshots == null && currentSnapshots != null)
|
||||||
|| (previousSnapshots != null && previousSnapshots.equals(currentSnapshots) == false)) {
|
|| (previousSnapshots != null && previousSnapshots.equals(currentSnapshots) == false)) {
|
||||||
synchronized (shardSnapshots) {
|
synchronized (shardSnapshots) {
|
||||||
processIndexShardSnapshots(currentSnapshots);
|
cancelRemoved(currentSnapshots);
|
||||||
|
if (currentSnapshots != null) {
|
||||||
|
startNewSnapshots(currentSnapshots);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -199,18 +202,6 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks if any new shards should be snapshotted on this node
|
|
||||||
*
|
|
||||||
* @param snapshotsInProgress Current snapshots in progress in cluster state
|
|
||||||
*/
|
|
||||||
private void processIndexShardSnapshots(SnapshotsInProgress snapshotsInProgress) {
|
|
||||||
cancelRemoved(snapshotsInProgress);
|
|
||||||
if (snapshotsInProgress != null) {
|
|
||||||
startNewSnapshots(snapshotsInProgress);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) {
|
private void cancelRemoved(@Nullable SnapshotsInProgress snapshotsInProgress) {
|
||||||
// First, remove snapshots that are no longer there
|
// First, remove snapshots that are no longer there
|
||||||
Iterator<Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>>> it = shardSnapshots.entrySet().iterator();
|
Iterator<Map.Entry<Snapshot, Map<ShardId, IndexShardSnapshotStatus>>> it = shardSnapshots.entrySet().iterator();
|
||||||
|
|
|
@ -103,7 +103,7 @@ import static org.elasticsearch.cluster.SnapshotsInProgress.completed;
|
||||||
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes
|
* the {@link #beginSnapshot(ClusterState, SnapshotsInProgress.Entry, boolean, ActionListener)} method kicks in and initializes
|
||||||
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
|
* the snapshot in the repository and then populates list of shards that needs to be snapshotted in cluster state</li>
|
||||||
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
|
* <li>Each data node is watching for these shards and when new shards scheduled for snapshotting appear in the cluster state, data nodes
|
||||||
* start processing them through {@link SnapshotShardsService#processIndexShardSnapshots(SnapshotsInProgress)} method</li>
|
* start processing them through {@link SnapshotShardsService#startNewSnapshots} method</li>
|
||||||
* <li>Once shard snapshot is created data node updates state of the shard in the cluster state using
|
* <li>Once shard snapshot is created data node updates state of the shard in the cluster state using
|
||||||
* the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method</li>
|
* the {@link SnapshotShardsService#sendSnapshotShardUpdate(Snapshot, ShardId, ShardSnapshotStatus)} method</li>
|
||||||
* <li>When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot
|
* <li>When last shard is completed master node in {@link SnapshotShardsService#innerUpdateSnapshotState} method marks the snapshot
|
||||||
|
|
Loading…
Reference in New Issue