Optimize Snapshot Shard Status Update Handling (#62070) (#62219)

Avoiding a number of noop updates that were observed to cause trouble (as in needless noop CS publishing) which can become an issue when working with a large number of concurrent snapshot operations.
Also this sets up some simplifications made in the clone snapshot branch.
This commit is contained in:
Armin Braun 2020-09-10 16:29:16 +02:00 committed by GitHub
parent c8981ea93d
commit 7b941a18e9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 82 additions and 40 deletions

View File

@ -213,6 +213,34 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
failure, userMetadata, version);
}
/**
* Create a new instance that has its shard assignments replaced by the given shard assignment map.
* If the given shard assignments show all shard snapshots in a completed state then the returned instance will be of state
* {@link State#SUCCESS}, otherwise the state remains unchanged.
*
* @param shards new shard snapshot states
* @return new snapshot entry
*/
public Entry withShardStates(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
if (completed(shards.values())) {
return new Entry(snapshot, includeGlobalState, partial, State.SUCCESS, indices, dataStreams, startTime, repositoryStateId,
shards, failure, userMetadata, version);
}
return withStartedShards(shards);
}
/**
* Same as {@link #withShardStates} but does not check if the snapshot completed and thus is only to be used when starting new
* shard snapshots on data nodes for a running snapshot.
*/
public Entry withStartedShards(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
final SnapshotsInProgress.Entry updated = new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams,
startTime, repositoryStateId, shards, failure, userMetadata, version);
assert updated.state().completed() == false && completed(updated.shards().values()) == false
: "Only running snapshots allowed but saw [" + updated + "]";
return updated;
}
@Override
public String repository() {
return snapshot.getRepository();

View File

@ -2458,63 +2458,77 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public ClusterTasksResult<UpdateIndexShardSnapshotStatusRequest>
execute(ClusterState currentState, List<UpdateIndexShardSnapshotStatusRequest> tasks) {
int changedCount = 0;
int startedCount = 0;
final List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
final Map<String, Set<ShardId>> reusedShardIdsByRepo = new HashMap<>();
// Tasks to check for updates for running snapshots.
final List<UpdateIndexShardSnapshotStatusRequest> unconsumedTasks = new ArrayList<>(tasks);
// Tasks that were used to complete an existing in-progress shard snapshot
final Set<UpdateIndexShardSnapshotStatusRequest> executedTasks = new HashSet<>();
for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) {
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean updated = false;
for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) {
if (entry.state().completed()) {
entries.add(entry);
continue;
}
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = null;
for (Iterator<UpdateIndexShardSnapshotStatusRequest> iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) {
final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = iterator.next();
final Snapshot updatedSnapshot = updateSnapshotState.snapshot();
final String updatedRepository = updatedSnapshot.getRepository();
if (entry.repository().equals(updatedRepository) == false) {
continue;
}
final ShardId finishedShardId = updateSnapshotState.shardId();
if (entry.snapshot().equals(updateSnapshotState.snapshot())) {
logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(),
if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) {
final ShardSnapshotStatus existing = entry.shards().get(finishedShardId);
if (existing == null) {
logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]",
updateSnapshotState, entry);
assert false : "This should never happen, data nodes should only send updates for expected shards";
continue;
}
if (existing.state().completed()) {
// No point in doing noop updates that might happen if data nodes resends shard status after a disconnect.
iterator.remove();
continue;
}
logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot,
finishedShardId, updateSnapshotState.status().state());
if (updated == false) {
shards.putAll(entry.shards());
updated = true;
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
shards.put(finishedShardId, updateSnapshotState.status());
executedTasks.add(updateSnapshotState);
changedCount++;
} else {
final String updatedRepository = updateSnapshotState.snapshot().getRepository();
final Set<ShardId> reusedShardIds = reusedShardIdsByRepo.computeIfAbsent(updatedRepository, k -> new HashSet<>());
if (entry.state().completed() == false && entry.repository().equals(updatedRepository)
&& reusedShardIds.contains(finishedShardId) == false) {
} else if (executedTasks.contains(updateSnapshotState)) {
// tasks that completed a shard might allow starting a new shard snapshot for the current snapshot
final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId);
if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) {
continue;
}
if (updated == false) {
shards.putAll(entry.shards());
updated = true;
if (shards == null) {
shards = ImmutableOpenMap.builder(entry.shards());
}
final ShardSnapshotStatus finishedStatus = updateSnapshotState.status();
logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId,
finishedStatus.nodeId(), finishedStatus.generation());
shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation()));
reusedShardIds.add(finishedShardId);
}
iterator.remove();
startedCount++;
}
}
if (updated) {
if (completed(shards.values()) == false) {
entries.add(new SnapshotsInProgress.Entry(entry, shards.build()));
} else {
// Snapshot is finished - mark it as done
// TODO: Add PARTIAL_SUCCESS status?
SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build());
entries.add(updatedEntry);
}
} else {
if (shards == null) {
entries.add(entry);
} else {
entries.add(entry.withShardStates(shards.build()));
}
}
if (changedCount > 0) {
logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount);
logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " +
"[{}] shard snapshots", changedCount, startedCount);
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks)
.build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(unmodifiableList(entries))).build());
SnapshotsInProgress.of(entries)).build());
}
return ClusterTasksResult.<UpdateIndexShardSnapshotStatusRequest>builder().successes(tasks).build(currentState);
}