From 81e96954d01f8001affef52489f761c6ef1db164 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 14 Jul 2020 11:49:09 +0200 Subject: [PATCH] Improve Efficiency of SnapshotsService CS Apply (#56874) (#59508) This change removes the redundant submitting of two separate cluster state updates for the node configuration changes and routing changes that affect snapshots. Since we submitted the task to deal with node configuration changes every time on master fail-over we could also move the BwC cleanup loop that removes `INIT` state snapshots as well as snapshots that have all their shards completed into this cluster state update task. Aside from improving efficiency overall this change has the fortunate side effect of moving all snapshot finalization to the CS update thread. This is helpful for concurrent snapshots since it makes it very natural and straight forward to order snapshot finalizations by exploiting that they are all initiated on the same thread. --- .../snapshots/SnapshotsService.java | 203 ++++++++---------- 1 file changed, 87 insertions(+), 116 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 34e942d706a..7959901b089 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -90,6 +90,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -101,7 +102,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; -import java.util.stream.StreamSupport; import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableList; @@ -744,21 +744,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); final boolean newMaster = event.previousState().nodes().isLocalNodeElectedMaster() == false; if (snapshotsInProgress != null) { - if (newMaster || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes())) { - processSnapshotsOnRemovedNodes(); - } - if (event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)) { - processStartedShards(); - } - if (newMaster) { - // Cleanup all snapshots that have no more work left: - // 1. Completed snapshots - // 2. Snapshots in state INIT that the previous master failed to start - // 3. Snapshots in any other state that have all their shard tasks completed - snapshotsInProgress.entries().stream().filter( - entry -> entry.state().completed() || entry.state() == State.INIT || completed(entry.shards().values()) - ).forEach(entry -> endSnapshot(entry, event.state().metadata())); - } + processExternalChanges(newMaster || removedNodesCleanupNeeded(snapshotsInProgress, event.nodesDelta().removedNodes()), + event.routingTableChanged() && waitingShardsStartedOrUnassigned(snapshotsInProgress, event)); } if (newMaster) { finalizeSnapshotDeletionFromPreviousMaster(event.state()); @@ -832,64 +819,70 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } /** - * Cleans up shard snapshots that were running on removed nodes + * Updates the state of in-progress snapshots in reaction to a change in the configuration of the cluster nodes (master fail-over or + * disconnect of a data node that was executing a snapshot) or a routing change that started shards whose snapshot state is + * {@link SnapshotsInProgress.ShardState#WAITING}. + * + * @param changedNodes true iff either a master fail-over occurred or a data node that was doing snapshot work got removed from the + * cluster + * @param startShards true iff any waiting shards were started due to a routing change */ - private void processSnapshotsOnRemovedNodes() { - clusterService.submitStateUpdateTask("update snapshot state after node removal", new ClusterStateUpdateTask() { + private void processExternalChanges(boolean changedNodes, boolean startShards) { + if (changedNodes == false && startShards == false) { + // nothing to do, no relevant external change happened + return; + } + clusterService.submitStateUpdateTask("update snapshot after shards started [" + startShards + + "] or node configuration changed [" + changedNodes + "]", new ClusterStateUpdateTask() { private final Collection finishedSnapshots = new ArrayList<>(); @Override public ClusterState execute(ClusterState currentState) { + RoutingTable routingTable = currentState.routingTable(); + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + assert snapshots != null : "We only submit this kind of update if there have been snapshots before"; DiscoveryNodes nodes = currentState.nodes(); - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - if (snapshots == null) { - return currentState; - } boolean changed = false; + final EnumSet statesToUpdate; + // If we are reacting to a change in the cluster node configuration we have to update the shard states of both started and + // aborted snapshots to potentially fail shards running on the removed nodes + if (changedNodes) { + statesToUpdate = EnumSet.of(State.STARTED, State.ABORTED); + } else { + // We are reacting to shards that started only so which only affects the individual shard states of started snapshots + statesToUpdate = EnumSet.of(State.STARTED); + } ArrayList entries = new ArrayList<>(); for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { - SnapshotsInProgress.Entry updatedSnapshot = snapshot; - if (snapshot.state() == State.STARTED || snapshot.state() == State.ABORTED) { - ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - boolean snapshotChanged = false; - for (ObjectObjectCursor shardEntry : snapshot.shards()) { - final ShardSnapshotStatus shardStatus = shardEntry.value; - final ShardId shardId = shardEntry.key; - if (!shardStatus.state().completed() && shardStatus.nodeId() != null) { - if (nodes.nodeExists(shardStatus.nodeId())) { - shards.put(shardId, shardStatus); - } else { - // TODO: Restart snapshot on another node? - snapshotChanged = true; - logger.warn("failing snapshot of shard [{}] on closed node [{}]", - shardId, shardStatus.nodeId()); - shards.put(shardId, - new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown", - shardStatus.generation())); - } - } else { - shards.put(shardId, shardStatus); - } - } - if (snapshotChanged) { + if (statesToUpdate.contains(snapshot.state())) { + ImmutableOpenMap shards = + processWaitingShardsAndRemovedNodes(snapshot.shards(), routingTable, nodes); + if (shards != null) { + final SnapshotsInProgress.Entry updatedSnapshot; changed = true; - ImmutableOpenMap shardsMap = shards.build(); - if (!snapshot.state().completed() && completed(shardsMap.values())) { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shardsMap); + if (completed(shards.values())) { + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards); finishedSnapshots.add(updatedSnapshot); } else { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, snapshot.state(), shardsMap); + updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards); } + entries.add(updatedSnapshot); + } else { + entries.add(snapshot); } - entries.add(updatedSnapshot); - } else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) { + } else if (snapshot.repositoryStateId() == RepositoryData.UNKNOWN_REPO_GEN) { + // BwC path, older versions could create entries with unknown repo GEN in INIT or ABORTED state that did not yet + // write anything to the repository physically. This means we can simply remove these from the cluster state + // without having to do any additional cleanup. changed = true; - // A snapshot in INIT state hasn't yet written anything to the repository so we simply remove it - // from the cluster state without any further cleanup + logger.debug("[{}] was found in dangling INIT or ABORTED state", snapshot); + } else { + if (snapshot.state().completed() || completed(snapshot.shards().values())) { + finishedSnapshots.add(snapshot); + } + entries.add(snapshot); } - assert updatedSnapshot.shards().size() == snapshot.shards().size() - : "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]"; } if (changed) { return ClusterState.builder(currentState) @@ -900,7 +893,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override public void onFailure(String source, Exception e) { - logger.warn("failed to update snapshot state after node removal"); + logger.warn(() -> new ParameterizedMessage( + "failed to update snapshot state after shards started or nodes removed from [{}] ", source), e); } @Override @@ -910,58 +904,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus }); } - private void processStartedShards() { - clusterService.submitStateUpdateTask("update snapshot state after shards started", new ClusterStateUpdateTask() { - - private final Collection finishedSnapshots = new ArrayList<>(); - - @Override - public ClusterState execute(ClusterState currentState) { - RoutingTable routingTable = currentState.routingTable(); - SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - boolean changed = false; - if (snapshots != null) { - ArrayList entries = new ArrayList<>(); - for (final SnapshotsInProgress.Entry snapshot : snapshots.entries()) { - SnapshotsInProgress.Entry updatedSnapshot = snapshot; - if (snapshot.state() == State.STARTED) { - ImmutableOpenMap shards = processWaitingShards(snapshot.shards(), - routingTable); - if (shards != null) { - changed = true; - if (!snapshot.state().completed() && completed(shards.values())) { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards); - finishedSnapshots.add(updatedSnapshot); - } else { - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards); - } - } - entries.add(updatedSnapshot); - } - } - if (changed) { - return ClusterState.builder(currentState) - .putCustom(SnapshotsInProgress.TYPE, SnapshotsInProgress.of(entries)).build(); - } - } - return currentState; - } - - @Override - public void onFailure(String source, Exception e) { - logger.warn(() -> - new ParameterizedMessage("failed to update snapshot state after shards started from [{}] ", source), e); - } - - @Override - public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - finishedSnapshots.forEach(entry -> endSnapshot(entry, newState.metadata())); - } - }); - } - - private static ImmutableOpenMap processWaitingShards( - ImmutableOpenMap snapshotShards, RoutingTable routingTable) { + private static ImmutableOpenMap processWaitingShardsAndRemovedNodes( + ImmutableOpenMap snapshotShards, RoutingTable routingTable, DiscoveryNodes nodes) { boolean snapshotChanged = false; ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); for (ObjectObjectCursor shardEntry : snapshotShards) { @@ -991,6 +935,17 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId()); shards.put(shardId, new ShardSnapshotStatus( shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned", shardStatus.generation())); + } else if (shardStatus.state().completed() == false && shardStatus.nodeId() != null) { + if (nodes.nodeExists(shardStatus.nodeId())) { + shards.put(shardId, shardStatus); + } else { + // TODO: Restart snapshot on another node? + snapshotChanged = true; + logger.warn("failing snapshot of shard [{}] on closed node [{}]", + shardId, shardStatus.nodeId()); + shards.put(shardId, + new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown", shardStatus.generation())); + } } else { shards.put(shardId, shardStatus); } @@ -1026,11 +981,26 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } private static boolean removedNodesCleanupNeeded(SnapshotsInProgress snapshotsInProgress, List removedNodes) { - // If at least one shard was running on a removed node - we need to fail it - return removedNodes.isEmpty() == false && snapshotsInProgress.entries().stream().flatMap(snapshot -> - StreamSupport.stream(((Iterable) () -> snapshot.shards().valuesIt()).spliterator(), false) - .filter(s -> s.state().completed() == false).map(ShardSnapshotStatus::nodeId)) - .anyMatch(removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet())::contains); + if (removedNodes.isEmpty()) { + // Nothing to do, no nodes removed + return false; + } + final Set removedNodeIds = removedNodes.stream().map(DiscoveryNode::getId).collect(Collectors.toSet()); + return snapshotsInProgress.entries().stream() + .anyMatch(snapshot -> { + if (snapshot.state().completed()) { + // nothing to do for already completed snapshots + return false; + } + for (ObjectCursor shardStatus : snapshot.shards().values()) { + final ShardSnapshotStatus shardSnapshotStatus = shardStatus.value; + if (shardSnapshotStatus.state().completed() == false && removedNodeIds.contains(shardSnapshotStatus.nodeId())) { + // Snapshot had an incomplete shard running on a removed node so we need to adjust that shard's snapshot status + return true; + } + } + return false; + }); } /** @@ -1830,7 +1800,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (endingSnapshots.contains(request.snapshot()) == false) { final SnapshotsInProgress snapshotsInProgress = newState.custom(SnapshotsInProgress.TYPE); final SnapshotsInProgress.Entry updatedEntry = snapshotsInProgress.snapshot(request.snapshot()); - if (updatedEntry.state().completed()) { + // If the entry is still in the cluster state and is completed, try finalizing the snapshot in the repo + if (updatedEntry != null && updatedEntry.state().completed()) { endSnapshot(updatedEntry, newState.metadata()); } }