From d8b43c62838564a700d0ff99f9787bda875997c1 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Wed, 15 Apr 2020 14:47:16 +0200 Subject: [PATCH] Make Snapshot Deletes Less Racy (#54765) (#55226) Snapshot deletes should first check the cluster state for an in-progress snapshot and try to abort it before checking the repository contents. This allows for atomically checking and aborting a snapshot in the same cluster state update, removing all possible races where a snapshot that is in-progress could not be found if it finishes between checking the repository contents and the cluster state. Also removes confusing races, where checking the cluster state off of the cluster state thread finds an in-progress snapshot that is then not found in the cluster state update to abort it. Finally, the logic to use the repository generation of the in-progress snapshot + 1 was error prone because it would always fail the delete when the repository had a pending generation different from its safe generation when a snapshot started (leading to the snapshot finalizing at a higher generation). These issues (particularly that last point) can easily be reproduced by running `SLMSnapshotBlockingIntegTests` in a loop with current `master` (see #54766). The snapshot resiliency test for concurrent snapshot creation and deletion was made to more aggressively start the delete operation so that the above races would become visible. Previously, the fact that deletes would never coincide with initializing snapshots resulted in a number of the above races not reproducing. This PR is the most consistent I could get snapshot deletes without changes to the state machine. The fact that aborted deletes will not put the delete operation in the cluster state before waiting for the snapshot to abort still allows for some possible (though practically very unlikely) races. These will be fixed by a state-machine change in upcoming work in #54705 (which will have a much simpler and clearer diff after this change). Closes #54766 --- .../delete/TransportDeleteSnapshotAction.java | 4 +- .../cluster/SnapshotDeletionsInProgress.java | 3 + .../snapshots/SnapshotsService.java | 355 +++++++++--------- .../snapshots/SnapshotResiliencyTests.java | 15 +- 4 files changed, 203 insertions(+), 174 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java index da416631bee..556237d4470 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java @@ -53,7 +53,7 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction listener) { snapshotsService.deleteSnapshot(request.repository(), request.snapshot(), - ActionListener.map(listener, v -> new AcknowledgedResponse(true)), false); + ActionListener.map(listener, v -> new AcknowledgedResponse(true))); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java index 9c25c5578f3..7f18bf2d17c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryOperation; import org.elasticsearch.snapshots.Snapshot; @@ -174,6 +175,8 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable i this.snapshot = snapshot; this.startTime = startTime; this.repositoryStateId = repositoryStateId; + assert repositoryStateId > RepositoryData.EMPTY_REPO_GEN : + "Can't delete based on an empty or unknown repository generation but saw [" + repositoryStateId + "]"; } public Entry(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 9fa55bf90bb..f3e7d3e267c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -696,22 +696,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus entries.add(updatedSnapshot); } else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) { changed = true; - // Mark the snapshot as aborted as it failed to start from the previous master - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); - entries.add(updatedSnapshot); - - // Clean up the snapshot that failed to start from the old master - deleteSnapshot(snapshot.snapshot(), new ActionListener() { - @Override - public void onResponse(Void aVoid) { - logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot()); - } - - @Override - public void onFailure(Exception e) { - logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); - } - }, updatedSnapshot.repositoryStateId(), false); + // A snapshot in INIT state hasn't yet written anything to the repository so we simply remove it + // from the cluster state without any further cleanup } assert updatedSnapshot.shards().size() == snapshot.shards().size() : "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]"; @@ -1033,61 +1019,182 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } /** - * Deletes a snapshot from the repository, looking up the {@link Snapshot} reference before deleting. - * If the snapshot is still running cancels the snapshot first and then deletes it from the repository. + * Deletes a snapshot from the repository or aborts a running snapshot. + * First checks if the snapshot is still running and if so cancels the snapshot and then deletes it from the repository. + * If the snapshot is not running, moves to trying to find a matching {@link Snapshot} for the given name in the repository and if + * one is found deletes it by invoking {@link #deleteCompletedSnapshot}. * * @param repositoryName repositoryName * @param snapshotName snapshotName * @param listener listener */ - public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener listener, - final boolean immediatePriority) { - // First, look for the snapshot in the repository - final Repository repository = repositoriesService.repository(repositoryName); - repository.getRepositoryData(ActionListener.wrap(repositoryData -> { - Optional matchedEntry = repositoryData.getSnapshotIds() - .stream() - .filter(s -> s.getName().equals(snapshotName)) - .findFirst(); - // if nothing found by the same name, then look in the cluster state for current in progress snapshots - long repoGenId = repositoryData.getGenId(); - if (matchedEntry.isPresent() == false) { - Optional matchedInProgress = currentSnapshots( - clusterService.state().custom(SnapshotsInProgress.TYPE), repositoryName, Collections.emptyList()).stream() - .filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst(); - if (matchedInProgress.isPresent()) { - matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId()); - // Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes - repoGenId = matchedInProgress.get().repositoryStateId() + 1L; - } - } - if (matchedEntry.isPresent() == false) { - throw new SnapshotMissingException(repositoryName, snapshotName); - } - deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority); - }, listener::onFailure)); - } + public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener listener) { + logger.info("deleting snapshot [{}] from repository [{}]", snapshotName, repositoryName); - /** - * Deletes snapshot from repository. - *

- * If the snapshot is still running cancels the snapshot first and then deletes it from the repository. - * - * @param snapshot snapshot - * @param listener listener - * @param repositoryStateId the unique id for the state of the repository - */ - private void deleteSnapshot(final Snapshot snapshot, final ActionListener listener, final long repositoryStateId, - final boolean immediatePriority) { - Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL; - logger.info("deleting snapshot [{}] assuming repository generation [{}] and with priority [{}]", - snapshot, repositoryStateId, priority); - clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { + clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) { - boolean waitForSnapshot = false; + Snapshot runningSnapshot; boolean abortedDuringInit = false; + @Override + public ClusterState execute(ClusterState currentState) { + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry snapshotEntry = findInProgressSnapshot(snapshots, snapshotName, repositoryName); + if (snapshotEntry == null) { + return currentState; + } + runningSnapshot = snapshotEntry.snapshot(); + final ImmutableOpenMap shards; + + final State state = snapshotEntry.state(); + final String failure; + if (state == State.INIT) { + // snapshot is still initializing, mark it as aborted + shards = snapshotEntry.shards(); + assert shards.isEmpty(); + failure = "Snapshot was aborted during initialization"; + abortedDuringInit = true; + } else if (state == State.STARTED) { + // snapshot is started - mark every non completed shard as aborted + final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); + for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) { + ShardSnapshotStatus status = shardEntry.value; + if (status.state().completed() == false) { + status = new ShardSnapshotStatus( + status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion", status.generation()); + } + shardsBuilder.put(shardEntry.key, status); + } + shards = shardsBuilder.build(); + failure = "Snapshot was aborted by deletion"; + } else { + boolean hasUncompletedShards = false; + // Cleanup in case a node gone missing and snapshot wasn't updated for some reason + for (ObjectCursor shardStatus : snapshotEntry.shards().values()) { + // Check if we still have shard running on existing nodes + if (shardStatus.value.state().completed() == false && shardStatus.value.nodeId() != null + && currentState.nodes().get(shardStatus.value.nodeId()) != null) { + hasUncompletedShards = true; + break; + } + } + if (hasUncompletedShards) { + // snapshot is being finalized - wait for shards to complete finalization process + logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes"); + return currentState; + } else { + // no shards to wait for but a node is gone - this is the only case + // where we force to finish the snapshot + logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); + shards = snapshotEntry.shards(); + } + failure = snapshotEntry.failure(); + } + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, + new SnapshotsInProgress(snapshots.entries().stream().map(existing -> { + if (existing.equals(snapshotEntry)) { + return new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure); + } + return existing; + }).collect(Collectors.toList()))).build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (runningSnapshot == null) { + tryDeleteExisting(Priority.NORMAL); + return; + } + logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); + addListener(runningSnapshot, ActionListener.wrap( + snapshotInfo -> { + logger.debug("deleted snapshot completed - deleting files"); + tryDeleteExisting(Priority.IMMEDIATE); + }, + e -> { + if (abortedDuringInit) { + logger.info("Successfully aborted snapshot [{}]", runningSnapshot); + listener.onResponse(null); + } else { + if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) + != null) { + logger.warn("master failover before deleted snapshot could complete", e); + // Just pass the exception to the transport handler as is so it is retried on the new master + listener.onFailure(e); + } else { + logger.warn("deleted snapshot failed", e); + listener.onFailure( + new SnapshotMissingException(runningSnapshot.getRepository(), runningSnapshot.getSnapshotId(), e)); + } + } + } + )); + } + + private void tryDeleteExisting(Priority priority) { + threadPool.generic().execute(ActionRunnable.wrap(listener, l -> + repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData -> { + Optional matchedEntry = repositoryData.getSnapshotIds() + .stream() + .filter(s -> s.getName().equals(snapshotName)) + .findFirst(); + // If we can't find the snapshot by the given name in the repository at all or if the snapshot we find in the + // repository is not the one we expected to find when waiting for a finishing snapshot we fail. + // Note: Not finding a snapshot we expected to find is practically impossible as it would imply that the snapshot + // we waited for was concurrently deleted and another snapshot by the same name concurrently created + // during the context switch from the cluster state thread to the snapshot thread. We still guard against the + // possibility as a safety measure. + if (matchedEntry.isPresent() == false + || (runningSnapshot != null && matchedEntry.get().equals(runningSnapshot.getSnapshotId()) == false)) { + if (runningSnapshot != null && matchedEntry.isPresent()) { + logger.warn("Waited for snapshot [{}}] but found snapshot [{}] in repository [{}]", + runningSnapshot.getSnapshotId(), matchedEntry.get(), repositoryName); + } + l.onFailure(new SnapshotMissingException(repositoryName, snapshotName)); + } else { + deleteCompletedSnapshot( + new Snapshot(repositoryName, matchedEntry.get()), repositoryData.getGenId(), priority, l); + } + }, l::onFailure)))); + } + }); + } + + // Return in-progress snapshot entry by name and repository in the given cluster state or null if none is found + @Nullable + private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String snapshotName, + String repositoryName) { + if (snapshots == null) { + return null; + } + SnapshotsInProgress.Entry snapshotEntry = null; + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + if (entry.repository().equals(repositoryName) + && entry.snapshot().getSnapshotId().getName().equals(snapshotName)) { + snapshotEntry = entry; + break; + } + } + return snapshotEntry; + } + + /** + * Deletes a snapshot that is assumed to be in the repository and not tracked as in-progress in the cluster state. + * + * @param snapshot Snapshot to delete + * @param repositoryStateId Repository generation to base the delete on + * @param listener Listener to complete when done + */ + private void deleteCompletedSnapshot(Snapshot snapshot, long repositoryStateId, Priority priority, ActionListener listener) { + logger.debug("deleting snapshot [{}] assuming repository generation [{}] and with priority [{}]", snapshot, repositoryStateId, + priority); + clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { @Override public ClusterState execute(ClusterState currentState) { SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); @@ -1113,81 +1220,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } } } - ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState); SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - SnapshotsInProgress.Entry snapshotEntry = snapshots != null ? snapshots.snapshot(snapshot) : null; - if (snapshotEntry == null) { - // This snapshot is not running - delete - if (snapshots != null && !snapshots.entries().isEmpty()) { - // However other snapshots are running - cannot continue - throw new ConcurrentSnapshotExecutionException(snapshot, "another snapshot is currently running cannot delete"); - } - // add the snapshot deletion to the cluster state - SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry( - snapshot, - threadPool.absoluteTimeInMillis(), - repositoryStateId - ); - if (deletionsInProgress != null) { - deletionsInProgress = deletionsInProgress.withAddedEntry(entry); - } else { - deletionsInProgress = SnapshotDeletionsInProgress.newInstance(entry); - } - clusterStateBuilder.putCustom(SnapshotDeletionsInProgress.TYPE, deletionsInProgress); - } else { - // This snapshot is currently running - stopping shards first - waitForSnapshot = true; - - final ImmutableOpenMap shards; - - final State state = snapshotEntry.state(); - final String failure; - if (state == State.INIT) { - // snapshot is still initializing, mark it as aborted - shards = snapshotEntry.shards(); - assert shards.isEmpty(); - failure = "Snapshot was aborted during initialization"; - abortedDuringInit = true; - } else if (state == State.STARTED) { - // snapshot is started - mark every non completed shard as aborted - final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); - for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) { - ShardSnapshotStatus status = shardEntry.value; - if (status.state().completed() == false) { - status = new ShardSnapshotStatus( - status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion", status.generation()); - } - shardsBuilder.put(shardEntry.key, status); - } - shards = shardsBuilder.build(); - failure = "Snapshot was aborted by deletion"; - } else { - boolean hasUncompletedShards = false; - // Cleanup in case a node gone missing and snapshot wasn't updated for some reason - for (ObjectCursor shardStatus : snapshotEntry.shards().values()) { - // Check if we still have shard running on existing nodes - if (shardStatus.value.state().completed() == false && shardStatus.value.nodeId() != null - && currentState.nodes().get(shardStatus.value.nodeId()) != null) { - hasUncompletedShards = true; - break; - } - } - if (hasUncompletedShards) { - // snapshot is being finalized - wait for shards to complete finalization process - logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes"); - return currentState; - } else { - // no shards to wait for but a node is gone - this is the only case - // where we force to finish the snapshot - logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); - shards = snapshotEntry.shards(); - } - failure = snapshotEntry.failure(); - } - SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure); - clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(newSnapshot)); + if (snapshots != null && snapshots.entries().isEmpty() == false) { + // However other snapshots are running - cannot continue + throw new ConcurrentSnapshotExecutionException(snapshot, "another snapshot is currently running cannot delete"); } - return clusterStateBuilder.build(); + // add the snapshot deletion to the cluster state + SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry( + snapshot, + threadPool.absoluteTimeInMillis(), + repositoryStateId + ); + if (deletionsInProgress != null) { + deletionsInProgress = deletionsInProgress.withAddedEntry(entry); + } else { + deletionsInProgress = SnapshotDeletionsInProgress.newInstance(entry); + } + return ClusterState.builder(currentState).putCustom(SnapshotDeletionsInProgress.TYPE, deletionsInProgress).build(); } @Override @@ -1197,42 +1246,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (waitForSnapshot) { - logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); - addListener(snapshot, ActionListener.wrap( - snapshotInfo -> { - logger.debug("deleted snapshot completed - deleting files"); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { - deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true); - } catch (Exception ex) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex); - } - } - ); - }, - e -> { - if (abortedDuringInit) { - logger.debug(() -> new ParameterizedMessage("Snapshot [{}] was aborted during INIT", snapshot), e); - listener.onResponse(null); - } else { - if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) - != null) { - logger.warn("master failover before deleted snapshot could complete", e); - // Just pass the exception to the transport handler as is so it is retried on the new master - listener.onFailure(e); - } else { - logger.warn("deleted snapshot failed", e); - listener.onFailure( - new SnapshotMissingException(snapshot.getRepository(), snapshot.getSnapshotId(), e)); - } - } - } - )); - } else { - logger.debug("deleted snapshot is not running - deleting files"); - deleteSnapshotFromRepository(snapshot, listener, repositoryStateId, newState.nodes().getMinNodeVersion()); - } + deleteSnapshotFromRepository(snapshot, listener, repositoryStateId, newState.nodes().getMinNodeVersion()); } }); } @@ -1352,6 +1366,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (failure != null) { listener.onFailure(failure); } else { + logger.info("Successfully deleted snapshot [{}]", snapshot); listener.onResponse(null); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index ad551ad590c..fb8355885d4 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -89,9 +89,11 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -428,8 +430,16 @@ public class SnapshotResiliencyTests extends ESTestCase { final StepListener deleteSnapshotStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().deleteSnapshot( - new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener)); + masterNode.clusterService.addListener(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty() == false) { + client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener); + masterNode.clusterService.removeListener(this); + } + } + }); final StepListener createAnotherSnapshotResponseStepListener = new StepListener<>(); @@ -440,6 +450,7 @@ public class SnapshotResiliencyTests extends ESTestCase { deterministicTaskQueue.runAllRunnableTasks(); + assertNotNull(createSnapshotResponseStepListener.result()); assertNotNull(createAnotherSnapshotResponseStepListener.result()); SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));