diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index 11aa66710a9..f4c9d5a4215 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -213,6 +213,34 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement failure, userMetadata, version); } + /** + * Create a new instance that has its shard assignments replaced by the given shard assignment map. + * If the given shard assignments show all shard snapshots in a completed state then the returned instance will be of state + * {@link State#SUCCESS}, otherwise the state remains unchanged. + * + * @param shards new shard snapshot states + * @return new snapshot entry + */ + public Entry withShardStates(ImmutableOpenMap shards) { + if (completed(shards.values())) { + return new Entry(snapshot, includeGlobalState, partial, State.SUCCESS, indices, dataStreams, startTime, repositoryStateId, + shards, failure, userMetadata, version); + } + return withStartedShards(shards); + } + + /** + * Same as {@link #withShardStates} but does not check if the snapshot completed and thus is only to be used when starting new + * shard snapshots on data nodes for a running snapshot. + */ + public Entry withStartedShards(ImmutableOpenMap shards) { + final SnapshotsInProgress.Entry updated = new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, + startTime, repositoryStateId, shards, failure, userMetadata, version); + assert updated.state().completed() == false && completed(updated.shards().values()) == false + : "Only running snapshots allowed but saw [" + updated + "]"; + return updated; + } + @Override public String repository() { return snapshot.getRepository(); diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 812bbafce1e..049b1d4bcfd 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -2458,63 +2458,77 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus public ClusterTasksResult execute(ClusterState currentState, List tasks) { int changedCount = 0; + int startedCount = 0; final List entries = new ArrayList<>(); - final Map> reusedShardIdsByRepo = new HashMap<>(); + // Tasks to check for updates for running snapshots. + final List unconsumedTasks = new ArrayList<>(tasks); + // Tasks that were used to complete an existing in-progress shard snapshot + final Set executedTasks = new HashSet<>(); for (SnapshotsInProgress.Entry entry : currentState.custom(SnapshotsInProgress.TYPE, SnapshotsInProgress.EMPTY).entries()) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - boolean updated = false; - - for (UpdateIndexShardSnapshotStatusRequest updateSnapshotState : tasks) { + if (entry.state().completed()) { + entries.add(entry); + continue; + } + ImmutableOpenMap.Builder shards = null; + for (Iterator iterator = unconsumedTasks.iterator(); iterator.hasNext(); ) { + final UpdateIndexShardSnapshotStatusRequest updateSnapshotState = iterator.next(); + final Snapshot updatedSnapshot = updateSnapshotState.snapshot(); + final String updatedRepository = updatedSnapshot.getRepository(); + if (entry.repository().equals(updatedRepository) == false) { + continue; + } final ShardId finishedShardId = updateSnapshotState.shardId(); - if (entry.snapshot().equals(updateSnapshotState.snapshot())) { - logger.trace("[{}] Updating shard [{}] with status [{}]", updateSnapshotState.snapshot(), + if (entry.snapshot().getSnapshotId().equals(updatedSnapshot.getSnapshotId())) { + final ShardSnapshotStatus existing = entry.shards().get(finishedShardId); + if (existing == null) { + logger.warn("Received shard snapshot status update [{}] but this shard is not tracked in [{}]", + updateSnapshotState, entry); + assert false : "This should never happen, data nodes should only send updates for expected shards"; + continue; + } + if (existing.state().completed()) { + // No point in doing noop updates that might happen if data nodes resends shard status after a disconnect. + iterator.remove(); + continue; + } + logger.trace("[{}] Updating shard [{}] with status [{}]", updatedSnapshot, finishedShardId, updateSnapshotState.status().state()); - if (updated == false) { - shards.putAll(entry.shards()); - updated = true; + if (shards == null) { + shards = ImmutableOpenMap.builder(entry.shards()); } shards.put(finishedShardId, updateSnapshotState.status()); + executedTasks.add(updateSnapshotState); changedCount++; - } else { - final String updatedRepository = updateSnapshotState.snapshot().getRepository(); - final Set reusedShardIds = reusedShardIdsByRepo.computeIfAbsent(updatedRepository, k -> new HashSet<>()); - if (entry.state().completed() == false && entry.repository().equals(updatedRepository) - && reusedShardIds.contains(finishedShardId) == false) { - final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId); - if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { - continue; - } - if (updated == false) { - shards.putAll(entry.shards()); - updated = true; - } - final ShardSnapshotStatus finishedStatus = updateSnapshotState.status(); - logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId, - finishedStatus.nodeId(), finishedStatus.generation()); - shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); - reusedShardIds.add(finishedShardId); + } else if (executedTasks.contains(updateSnapshotState)) { + // tasks that completed a shard might allow starting a new shard snapshot for the current snapshot + final ShardSnapshotStatus existingStatus = entry.shards().get(finishedShardId); + if (existingStatus == null || existingStatus.state() != ShardState.QUEUED) { + continue; } + if (shards == null) { + shards = ImmutableOpenMap.builder(entry.shards()); + } + final ShardSnapshotStatus finishedStatus = updateSnapshotState.status(); + logger.trace("Starting [{}] on [{}] with generation [{}]", finishedShardId, + finishedStatus.nodeId(), finishedStatus.generation()); + shards.put(finishedShardId, new ShardSnapshotStatus(finishedStatus.nodeId(), finishedStatus.generation())); + iterator.remove(); + startedCount++; } } - if (updated) { - if (completed(shards.values()) == false) { - entries.add(new SnapshotsInProgress.Entry(entry, shards.build())); - } else { - // Snapshot is finished - mark it as done - // TODO: Add PARTIAL_SUCCESS status? - SnapshotsInProgress.Entry updatedEntry = new SnapshotsInProgress.Entry(entry, State.SUCCESS, shards.build()); - entries.add(updatedEntry); - } - } else { + if (shards == null) { entries.add(entry); + } else { + entries.add(entry.withShardStates(shards.build())); } } if (changedCount > 0) { - logger.trace("changed cluster state triggered by {} snapshot state updates", changedCount); + logger.trace("changed cluster state triggered by [{}] snapshot state updates and resulted in starting " + + "[{}] shard snapshots", changedCount, startedCount); return ClusterTasksResult.builder().successes(tasks) .build(ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, - SnapshotsInProgress.of(unmodifiableList(entries))).build()); + SnapshotsInProgress.of(entries)).build()); } return ClusterTasksResult.builder().successes(tasks).build(currentState); }