diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java index da416631bee..556237d4470 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/delete/TransportDeleteSnapshotAction.java @@ -53,7 +53,7 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction listener) { snapshotsService.deleteSnapshot(request.repository(), request.snapshot(), - ActionListener.map(listener, v -> new AcknowledgedResponse(true)), false); + ActionListener.map(listener, v -> new AcknowledgedResponse(true))); } } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java index 9c25c5578f3..7f18bf2d17c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotDeletionsInProgress.java @@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.repositories.RepositoryData; import org.elasticsearch.repositories.RepositoryOperation; import org.elasticsearch.snapshots.Snapshot; @@ -174,6 +175,8 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable i this.snapshot = snapshot; this.startTime = startTime; this.repositoryStateId = repositoryStateId; + assert repositoryStateId > RepositoryData.EMPTY_REPO_GEN : + "Can't delete based on an empty or unknown repository generation but saw [" + repositoryStateId + "]"; } public Entry(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 9fa55bf90bb..f3e7d3e267c 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -696,22 +696,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus entries.add(updatedSnapshot); } else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) { changed = true; - // Mark the snapshot as aborted as it failed to start from the previous master - updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); - entries.add(updatedSnapshot); - - // Clean up the snapshot that failed to start from the old master - deleteSnapshot(snapshot.snapshot(), new ActionListener() { - @Override - public void onResponse(Void aVoid) { - logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot()); - } - - @Override - public void onFailure(Exception e) { - logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot()); - } - }, updatedSnapshot.repositoryStateId(), false); + // A snapshot in INIT state hasn't yet written anything to the repository so we simply remove it + // from the cluster state without any further cleanup } assert updatedSnapshot.shards().size() == snapshot.shards().size() : "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]"; @@ -1033,61 +1019,182 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } /** - * Deletes a snapshot from the repository, looking up the {@link Snapshot} reference before deleting. - * If the snapshot is still running cancels the snapshot first and then deletes it from the repository. + * Deletes a snapshot from the repository or aborts a running snapshot. + * First checks if the snapshot is still running and if so cancels the snapshot and then deletes it from the repository. + * If the snapshot is not running, moves to trying to find a matching {@link Snapshot} for the given name in the repository and if + * one is found deletes it by invoking {@link #deleteCompletedSnapshot}. * * @param repositoryName repositoryName * @param snapshotName snapshotName * @param listener listener */ - public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener listener, - final boolean immediatePriority) { - // First, look for the snapshot in the repository - final Repository repository = repositoriesService.repository(repositoryName); - repository.getRepositoryData(ActionListener.wrap(repositoryData -> { - Optional matchedEntry = repositoryData.getSnapshotIds() - .stream() - .filter(s -> s.getName().equals(snapshotName)) - .findFirst(); - // if nothing found by the same name, then look in the cluster state for current in progress snapshots - long repoGenId = repositoryData.getGenId(); - if (matchedEntry.isPresent() == false) { - Optional matchedInProgress = currentSnapshots( - clusterService.state().custom(SnapshotsInProgress.TYPE), repositoryName, Collections.emptyList()).stream() - .filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst(); - if (matchedInProgress.isPresent()) { - matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId()); - // Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes - repoGenId = matchedInProgress.get().repositoryStateId() + 1L; - } - } - if (matchedEntry.isPresent() == false) { - throw new SnapshotMissingException(repositoryName, snapshotName); - } - deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority); - }, listener::onFailure)); - } + public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener listener) { + logger.info("deleting snapshot [{}] from repository [{}]", snapshotName, repositoryName); - /** - * Deletes snapshot from repository. - *

- * If the snapshot is still running cancels the snapshot first and then deletes it from the repository. - * - * @param snapshot snapshot - * @param listener listener - * @param repositoryStateId the unique id for the state of the repository - */ - private void deleteSnapshot(final Snapshot snapshot, final ActionListener listener, final long repositoryStateId, - final boolean immediatePriority) { - Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL; - logger.info("deleting snapshot [{}] assuming repository generation [{}] and with priority [{}]", - snapshot, repositoryStateId, priority); - clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { + clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) { - boolean waitForSnapshot = false; + Snapshot runningSnapshot; boolean abortedDuringInit = false; + @Override + public ClusterState execute(ClusterState currentState) { + final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); + final SnapshotsInProgress.Entry snapshotEntry = findInProgressSnapshot(snapshots, snapshotName, repositoryName); + if (snapshotEntry == null) { + return currentState; + } + runningSnapshot = snapshotEntry.snapshot(); + final ImmutableOpenMap shards; + + final State state = snapshotEntry.state(); + final String failure; + if (state == State.INIT) { + // snapshot is still initializing, mark it as aborted + shards = snapshotEntry.shards(); + assert shards.isEmpty(); + failure = "Snapshot was aborted during initialization"; + abortedDuringInit = true; + } else if (state == State.STARTED) { + // snapshot is started - mark every non completed shard as aborted + final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); + for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) { + ShardSnapshotStatus status = shardEntry.value; + if (status.state().completed() == false) { + status = new ShardSnapshotStatus( + status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion", status.generation()); + } + shardsBuilder.put(shardEntry.key, status); + } + shards = shardsBuilder.build(); + failure = "Snapshot was aborted by deletion"; + } else { + boolean hasUncompletedShards = false; + // Cleanup in case a node gone missing and snapshot wasn't updated for some reason + for (ObjectCursor shardStatus : snapshotEntry.shards().values()) { + // Check if we still have shard running on existing nodes + if (shardStatus.value.state().completed() == false && shardStatus.value.nodeId() != null + && currentState.nodes().get(shardStatus.value.nodeId()) != null) { + hasUncompletedShards = true; + break; + } + } + if (hasUncompletedShards) { + // snapshot is being finalized - wait for shards to complete finalization process + logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes"); + return currentState; + } else { + // no shards to wait for but a node is gone - this is the only case + // where we force to finish the snapshot + logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); + shards = snapshotEntry.shards(); + } + failure = snapshotEntry.failure(); + } + return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE, + new SnapshotsInProgress(snapshots.entries().stream().map(existing -> { + if (existing.equals(snapshotEntry)) { + return new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure); + } + return existing; + }).collect(Collectors.toList()))).build(); + } + + @Override + public void onFailure(String source, Exception e) { + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + if (runningSnapshot == null) { + tryDeleteExisting(Priority.NORMAL); + return; + } + logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); + addListener(runningSnapshot, ActionListener.wrap( + snapshotInfo -> { + logger.debug("deleted snapshot completed - deleting files"); + tryDeleteExisting(Priority.IMMEDIATE); + }, + e -> { + if (abortedDuringInit) { + logger.info("Successfully aborted snapshot [{}]", runningSnapshot); + listener.onResponse(null); + } else { + if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) + != null) { + logger.warn("master failover before deleted snapshot could complete", e); + // Just pass the exception to the transport handler as is so it is retried on the new master + listener.onFailure(e); + } else { + logger.warn("deleted snapshot failed", e); + listener.onFailure( + new SnapshotMissingException(runningSnapshot.getRepository(), runningSnapshot.getSnapshotId(), e)); + } + } + } + )); + } + + private void tryDeleteExisting(Priority priority) { + threadPool.generic().execute(ActionRunnable.wrap(listener, l -> + repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData -> { + Optional matchedEntry = repositoryData.getSnapshotIds() + .stream() + .filter(s -> s.getName().equals(snapshotName)) + .findFirst(); + // If we can't find the snapshot by the given name in the repository at all or if the snapshot we find in the + // repository is not the one we expected to find when waiting for a finishing snapshot we fail. + // Note: Not finding a snapshot we expected to find is practically impossible as it would imply that the snapshot + // we waited for was concurrently deleted and another snapshot by the same name concurrently created + // during the context switch from the cluster state thread to the snapshot thread. We still guard against the + // possibility as a safety measure. + if (matchedEntry.isPresent() == false + || (runningSnapshot != null && matchedEntry.get().equals(runningSnapshot.getSnapshotId()) == false)) { + if (runningSnapshot != null && matchedEntry.isPresent()) { + logger.warn("Waited for snapshot [{}}] but found snapshot [{}] in repository [{}]", + runningSnapshot.getSnapshotId(), matchedEntry.get(), repositoryName); + } + l.onFailure(new SnapshotMissingException(repositoryName, snapshotName)); + } else { + deleteCompletedSnapshot( + new Snapshot(repositoryName, matchedEntry.get()), repositoryData.getGenId(), priority, l); + } + }, l::onFailure)))); + } + }); + } + + // Return in-progress snapshot entry by name and repository in the given cluster state or null if none is found + @Nullable + private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String snapshotName, + String repositoryName) { + if (snapshots == null) { + return null; + } + SnapshotsInProgress.Entry snapshotEntry = null; + for (SnapshotsInProgress.Entry entry : snapshots.entries()) { + if (entry.repository().equals(repositoryName) + && entry.snapshot().getSnapshotId().getName().equals(snapshotName)) { + snapshotEntry = entry; + break; + } + } + return snapshotEntry; + } + + /** + * Deletes a snapshot that is assumed to be in the repository and not tracked as in-progress in the cluster state. + * + * @param snapshot Snapshot to delete + * @param repositoryStateId Repository generation to base the delete on + * @param listener Listener to complete when done + */ + private void deleteCompletedSnapshot(Snapshot snapshot, long repositoryStateId, Priority priority, ActionListener listener) { + logger.debug("deleting snapshot [{}] assuming repository generation [{}] and with priority [{}]", snapshot, repositoryStateId, + priority); + clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) { @Override public ClusterState execute(ClusterState currentState) { SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); @@ -1113,81 +1220,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus } } } - ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState); SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); - SnapshotsInProgress.Entry snapshotEntry = snapshots != null ? snapshots.snapshot(snapshot) : null; - if (snapshotEntry == null) { - // This snapshot is not running - delete - if (snapshots != null && !snapshots.entries().isEmpty()) { - // However other snapshots are running - cannot continue - throw new ConcurrentSnapshotExecutionException(snapshot, "another snapshot is currently running cannot delete"); - } - // add the snapshot deletion to the cluster state - SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry( - snapshot, - threadPool.absoluteTimeInMillis(), - repositoryStateId - ); - if (deletionsInProgress != null) { - deletionsInProgress = deletionsInProgress.withAddedEntry(entry); - } else { - deletionsInProgress = SnapshotDeletionsInProgress.newInstance(entry); - } - clusterStateBuilder.putCustom(SnapshotDeletionsInProgress.TYPE, deletionsInProgress); - } else { - // This snapshot is currently running - stopping shards first - waitForSnapshot = true; - - final ImmutableOpenMap shards; - - final State state = snapshotEntry.state(); - final String failure; - if (state == State.INIT) { - // snapshot is still initializing, mark it as aborted - shards = snapshotEntry.shards(); - assert shards.isEmpty(); - failure = "Snapshot was aborted during initialization"; - abortedDuringInit = true; - } else if (state == State.STARTED) { - // snapshot is started - mark every non completed shard as aborted - final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); - for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) { - ShardSnapshotStatus status = shardEntry.value; - if (status.state().completed() == false) { - status = new ShardSnapshotStatus( - status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion", status.generation()); - } - shardsBuilder.put(shardEntry.key, status); - } - shards = shardsBuilder.build(); - failure = "Snapshot was aborted by deletion"; - } else { - boolean hasUncompletedShards = false; - // Cleanup in case a node gone missing and snapshot wasn't updated for some reason - for (ObjectCursor shardStatus : snapshotEntry.shards().values()) { - // Check if we still have shard running on existing nodes - if (shardStatus.value.state().completed() == false && shardStatus.value.nodeId() != null - && currentState.nodes().get(shardStatus.value.nodeId()) != null) { - hasUncompletedShards = true; - break; - } - } - if (hasUncompletedShards) { - // snapshot is being finalized - wait for shards to complete finalization process - logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes"); - return currentState; - } else { - // no shards to wait for but a node is gone - this is the only case - // where we force to finish the snapshot - logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately"); - shards = snapshotEntry.shards(); - } - failure = snapshotEntry.failure(); - } - SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure); - clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(newSnapshot)); + if (snapshots != null && snapshots.entries().isEmpty() == false) { + // However other snapshots are running - cannot continue + throw new ConcurrentSnapshotExecutionException(snapshot, "another snapshot is currently running cannot delete"); } - return clusterStateBuilder.build(); + // add the snapshot deletion to the cluster state + SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry( + snapshot, + threadPool.absoluteTimeInMillis(), + repositoryStateId + ); + if (deletionsInProgress != null) { + deletionsInProgress = deletionsInProgress.withAddedEntry(entry); + } else { + deletionsInProgress = SnapshotDeletionsInProgress.newInstance(entry); + } + return ClusterState.builder(currentState).putCustom(SnapshotDeletionsInProgress.TYPE, deletionsInProgress).build(); } @Override @@ -1197,42 +1246,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - if (waitForSnapshot) { - logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish"); - addListener(snapshot, ActionListener.wrap( - snapshotInfo -> { - logger.debug("deleted snapshot completed - deleting files"); - threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> { - try { - deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true); - } catch (Exception ex) { - logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex); - } - } - ); - }, - e -> { - if (abortedDuringInit) { - logger.debug(() -> new ParameterizedMessage("Snapshot [{}] was aborted during INIT", snapshot), e); - listener.onResponse(null); - } else { - if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class) - != null) { - logger.warn("master failover before deleted snapshot could complete", e); - // Just pass the exception to the transport handler as is so it is retried on the new master - listener.onFailure(e); - } else { - logger.warn("deleted snapshot failed", e); - listener.onFailure( - new SnapshotMissingException(snapshot.getRepository(), snapshot.getSnapshotId(), e)); - } - } - } - )); - } else { - logger.debug("deleted snapshot is not running - deleting files"); - deleteSnapshotFromRepository(snapshot, listener, repositoryStateId, newState.nodes().getMinNodeVersion()); - } + deleteSnapshotFromRepository(snapshot, listener, repositoryStateId, newState.nodes().getMinNodeVersion()); } }); } @@ -1352,6 +1366,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (failure != null) { listener.onFailure(failure); } else { + logger.info("Successfully deleted snapshot [{}]", snapshot); listener.onResponse(null); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index ad551ad590c..fb8355885d4 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -89,9 +89,11 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ClusterStateListener; import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.SnapshotsInProgress; @@ -428,8 +430,16 @@ public class SnapshotResiliencyTests extends ESTestCase { final StepListener deleteSnapshotStepListener = new StepListener<>(); - continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().deleteSnapshot( - new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener)); + masterNode.clusterService.addListener(new ClusterStateListener() { + @Override + public void clusterChanged(ClusterChangedEvent event) { + final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE); + if (snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty() == false) { + client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener); + masterNode.clusterService.removeListener(this); + } + } + }); final StepListener createAnotherSnapshotResponseStepListener = new StepListener<>(); @@ -440,6 +450,7 @@ public class SnapshotResiliencyTests extends ESTestCase { deterministicTaskQueue.runAllRunnableTasks(); + assertNotNull(createSnapshotResponseStepListener.result()); assertNotNull(createAnotherSnapshotResponseStepListener.result()); SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));