Improve Efficiency of SnapshotsService CS Apply () ()

This change removes the redundant submitting of two separate cluster state updates
for the node configuration changes and routing changes that affect snapshots.
Since we submitted the task to deal with node configuration changes every time on master
fail-over we could also move the BwC cleanup loop that removes `INIT` state snapshots as well
as snapshots that have all their shards completed into this cluster state update task.

Aside from improving efficiency overall this change has the fortunate side effect of moving
all snapshot finalization to the CS update thread. This is helpful for concurrent snapshots
since it makes it very natural and straight forward to order snapshot finalizations by exploiting
that they are all initiated on the same thread.
This commit is contained in:
Armin Braun 2020-07-14 11:49:09 +02:00 committed by GitHub
parent c8290167a0
commit 81e96954d0
No known key found for this signature in database

@ -90,6 +90,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -101,7 +102,6 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableList;
@ -744,21 +744,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
final boolean newMaster = event.previousState().nodes().isLocalNodeElectedMaster() == false;
if (snapshotsInProgress != null) {
if (newMaster || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes())) {
if (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) {
if (newMaster) {
// Cleanup all snapshots that have no more work left:
// 1. Completed snapshots
// 2. Snapshots in state INIT that the previous master failed to start
// 3. Snapshots in any other state that have all their shard tasks completed
entry -> entry.state().completed() || entry.state() == State.INIT || completed(entry.shards().values())
).forEach(entry -> endSnapshot(entry, event.state().metadata()));
processExternalChanges(newMaster || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()),
event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event));
if (newMaster) {
@ -832,64 +819,70 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
* Cleans up shard snapshots that were running on removed nodes
* Updates the state of in-progress snapshots in reaction to a change in the configuration of the cluster nodes (master fail-over or
* disconnect of a data node that was executing a snapshot) or a routing change that started shards whose snapshot state is
* {@link SnapshotsInProgress.ShardState#WAITING}.
* @param changedNodes true iff either a master fail-over occurred or a data node that was doing snapshot work got removed from the
* cluster
* @param startShards true iff any waiting shards were started due to a routing change
private void processSnapshotsOnRemovedNodes() {
clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() {
private void processExternalChanges(boolean changedNodes, boolean startShards) {
if (changedNodes == false && startShards == false) {
// nothing to do, no relevant external change happened
clusterService.submitStateUpdateTask("update snapshot after shards started [" + startShards +
"] or node configuration changed [" + changedNodes + "]", new ClusterStateUpdateTask() {
private final Collection<SnapshotsInProgress.Entry> finishedSnapshots = new ArrayList<>();
public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
assert snapshots != null : "We only submit this kind of update if there have been snapshots before";
DiscoveryNodes nodes = currentState.nodes();
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
if (snapshots == null) {
return currentState;
boolean changed = false;
final EnumSet<State> statesToUpdate;
// If we are reacting to a change in the cluster node configuration we have to update the shard states of both started and
// aborted snapshots to potentially fail shards running on the removed nodes
if (changedNodes) {
statesToUpdate = EnumSet.of(State.STARTED, State.ABORTED);
} else {
// We are reacting to shards that started only so which only affects the individual shard states of started snapshots
statesToUpdate = EnumSet.of(State.STARTED);
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
SnapshotsInProgress.Entry updatedSnapshot = snapshot;
if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) {
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
boolean snapshotChanged = false;
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshot.shards()) {
final ShardSnapshotStatus shardStatus = shardEntry.value;
final ShardId shardId = shardEntry.key;
if (!shardStatus.state().completed() && shardStatus.nodeId() != null) {
if (nodes.nodeExists(shardStatus.nodeId())) {
shards.put(shardId, shardStatus);
} else {
// TODO: Restart snapshot on another node?
snapshotChanged = true;
logger.warn("failing snapshot of shard [{}] on closed node [{}]",
shardId, shardStatus.nodeId());
new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown",
} else {
shards.put(shardId, shardStatus);
if (snapshotChanged) {
if (statesToUpdate.contains(snapshot.state())) {
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards =
processWaitingShardsAndRemovedNodes(snapshot.shards(), routingTable, nodes);
if (shards != null) {
final SnapshotsInProgress.Entry updatedSnapshot;
changed = true;
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shardsMap =;
if (!snapshot.state().completed() && completed(shardsMap.values())) {
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap);
if (completed(shards.values())) {
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards);
} else {
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap);
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards);
} else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) {
changed = true;
// A snapshot in INIT state hasn't yet written anything to the repository so we simply remove it
// from the cluster state without any further cleanup
} else {
} else if (snapshot.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) {
// BwC path, older versions could create entries with unknown repo GEN in INIT or ABORTED state that did not yet
// write anything to the repository physically. This means we can simply remove these from the cluster state
// without having to do any additional cleanup.
changed = true;
logger.debug("[{}] was found in dangling INIT or ABORTED state", snapshot);
} else {
if (snapshot.state().completed() || completed(snapshot.shards().values())) {
assert updatedSnapshot.shards().size() == snapshot.shards().size()
: "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]";
if (changed) {
return ClusterState.builder(currentState)
@ -900,7 +893,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
public void onFailure(String source, Exception e) {
logger.warn("failed to update snapshot state after node removal");
logger.warn(() -> new ParameterizedMessage(
"failed to update snapshot state after shards started or nodes removed from [{}] ", source), e);
@ -910,58 +904,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
private void processStartedShards() {
clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() {
private final Collection<SnapshotsInProgress.Entry> finishedSnapshots = new ArrayList<>();
public ClusterState execute(ClusterState currentState) {
RoutingTable routingTable = currentState.routingTable();
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
boolean changed = false;
if (snapshots != null) {
ArrayList<SnapshotsInProgress.Entry> entries = new ArrayList<>();
for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) {
SnapshotsInProgress.Entry updatedSnapshot = snapshot;
if (snapshot.state() == State.STARTED) {
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = processWaitingShards(snapshot.shards(),
if (shards != null) {
changed = true;
if (!snapshot.state().completed() && completed(shards.values())) {
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards);
} else {
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards);
if (changed) {
return ClusterState.builder(currentState)
.putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build();
return currentState;
public void onFailure(String source, Exception e) {
logger.warn(() ->
new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e);
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
finishedSnapshots.forEach(entry -> endSnapshot(entry, newState.metadata()));
private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShards(
ImmutableOpenMap<ShardId, ShardSnapshotStatus> snapshotShards, RoutingTable routingTable) {
private static ImmutableOpenMap<ShardId, ShardSnapshotStatus> processWaitingShardsAndRemovedNodes(
ImmutableOpenMap<ShardId, ShardSnapshotStatus> snapshotShards, RoutingTable routingTable, DiscoveryNodes nodes) {
boolean snapshotChanged = false;
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotShards) {
@ -991,6 +935,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId());
shards.put(shardId, new ShardSnapshotStatus(
shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned", shardStatus.generation()));
} else if (shardStatus.state().completed() == false && shardStatus.nodeId() != null) {
if (nodes.nodeExists(shardStatus.nodeId())) {
shards.put(shardId, shardStatus);
} else {
// TODO: Restart snapshot on another node?
snapshotChanged = true;
logger.warn("failing snapshot of shard [{}] on closed node [{}]",
shardId, shardStatus.nodeId());
new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown", shardStatus.generation()));
} else {
shards.put(shardId, shardStatus);
@ -1026,11 +981,26 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List<DiscoveryNode> removedNodes) {
// If at least one shard was running on a removed node - we need to fail it
return removedNodes.isEmpty() == false && snapshotsInProgress.entries().stream().flatMap(snapshot -><ShardSnapshotStatus>) () -> snapshot.shards().valuesIt()).spliterator(), false)
.filter(s -> s.state().completed() == false).map(ShardSnapshotStatus::nodeId))
if (removedNodes.isEmpty()) {
// Nothing to do, no nodes removed
return false;
final Set<String> removedNodeIds =;
return snapshotsInProgress.entries().stream()
.anyMatch(snapshot -> {
if (snapshot.state().completed()) {
// nothing to do for already completed snapshots
return false;
for (ObjectCursor<ShardSnapshotStatus> shardStatus : snapshot.shards().values()) {
final ShardSnapshotStatus shardSnapshotStatus = shardStatus.value;
if (shardSnapshotStatus.state().completed() == false && removedNodeIds.contains(shardSnapshotStatus.nodeId())) {
// Snapshot had an incomplete shard running on a removed node so we need to adjust that shard's snapshot status
return true;
return false;
@ -1830,7 +1800,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (endingSnapshots.contains(request.snapshot()) == false) {
final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(request.snapshot());
if (updatedEntry.state().completed()) {
// If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo
if (updatedEntry != null && updatedEntry.state().completed()) {
endSnapshot(updatedEntry, newState.metadata());