diff --git a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java index 4bebd06ee3d..e2a60c58fea 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/snapshots/ConcurrentSnapshotsIT.java @@ -547,7 +547,8 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { // Second delete works out cleanly since the repo is unblocked now assertThat(secondDeleteFuture.get().isAcknowledged(), is(true)); // Snapshot should have been aborted - assertThat(snapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.FAILED)); + final SnapshotException snapshotException = expectThrows(SnapshotException.class, snapshotFuture::actionGet); + assertThat(snapshotException.getMessage(), containsString(SnapshotsInProgress.ABORTED_FAILURE_TEXT)); assertThat(client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(), empty()); } @@ -573,7 +574,8 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { // Second delete works out cleanly since the repo is unblocked now assertThat(secondDeleteFuture.get().isAcknowledged(), is(true)); // Snapshot should have been aborted - assertThat(snapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.FAILED)); + final SnapshotException snapshotException = expectThrows(SnapshotException.class, snapshotFuture::actionGet); + assertThat(snapshotException.getMessage(), containsString(SnapshotsInProgress.ABORTED_FAILURE_TEXT)); assertThat(client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(), empty()); } @@ -1223,6 +1225,30 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase { assertThat(sne.getCause().getMessage(), containsString("exception after block")); } + public void testAbortNotStartedSnapshotWithoutIO() throws Exception { + internalCluster().startMasterOnlyNode(); + final String dataNode = internalCluster().startDataOnlyNode(); + final String repoName = "test-repo"; + createRepository(repoName, "mock"); + createIndexWithContent("test-index"); + + final ActionFuture createSnapshot1Future = + startFullSnapshotBlockedOnDataNode("first-snapshot", repoName, dataNode); + + final String snapshotTwo = "second-snapshot"; + final ActionFuture createSnapshot2Future = startFullSnapshot(repoName, snapshotTwo); + + awaitNumberOfSnapshotsInProgress(2); + + assertAcked(startDeleteSnapshot(repoName, snapshotTwo).get()); + final SnapshotException sne = expectThrows(SnapshotException.class, createSnapshot2Future::actionGet); + + assertFalse(createSnapshot1Future.isDone()); + unblockNode(repoName, dataNode); + assertSuccessful(createSnapshot1Future); + assertThat(getRepositoryData(repoName).getGenId(), is(0L)); + } + private static String startDataNodeWithLargeSnapshotPool() { return internalCluster().startDataOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS); } diff --git a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java index b0e6604b2e4..dbd5afcd3f9 100644 --- a/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java +++ b/server/src/main/java/org/elasticsearch/cluster/SnapshotsInProgress.java @@ -66,6 +66,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public static final String TYPE = "snapshots"; + public static final String ABORTED_FAILURE_TEXT = "Snapshot was aborted by deletion"; + @Override public boolean equals(Object o) { if (this == o) return true; @@ -291,14 +293,19 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement * data node or to {@link ShardState#FAILED} if not assigned to any data node. * If the instance had no in-progress shard snapshots assigned to data nodes it's moved to state {@link State#SUCCESS}, otherwise * it's moved to state {@link State#ABORTED}. + * In the special case where this instance has not yet made any progress on any shard this method just returns + * {@code null} since no abort is needed and the snapshot can simply be removed from the cluster state outright. * - * @return aborted snapshot entry + * @return aborted snapshot entry or {@code null} if entry can be removed from the cluster state directly */ + @Nullable public Entry abort() { final ImmutableOpenMap.Builder shardsBuilder = ImmutableOpenMap.builder(); boolean completed = true; + boolean allQueued = true; for (ObjectObjectCursor shardEntry : shards) { ShardSnapshotStatus status = shardEntry.value; + allQueued &= status.state() == ShardState.QUEUED; if (status.state().completed() == false) { final String nodeId = status.nodeId(); status = new ShardSnapshotStatus(nodeId, nodeId == null ? ShardState.FAILED : ShardState.ABORTED, @@ -307,7 +314,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement completed &= status.state().completed(); shardsBuilder.put(shardEntry.key, status); } - return fail(shardsBuilder.build(), completed ? State.SUCCESS : State.ABORTED, "Snapshot was aborted by deletion"); + if (allQueued) { + return null; + } + return fail(shardsBuilder.build(), completed ? State.SUCCESS : State.ABORTED, ABORTED_FAILURE_TEXT); } public Entry fail(ImmutableOpenMap shards, State state, String failure) { diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 6e667f0dab3..3277b92946d 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -2037,7 +2037,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus private boolean reusedExistingDelete = false; - private final Collection completedSnapshots = new ArrayList<>(); + // Snapshots that had all of their shard snapshots in queued state and thus were removed from the + // cluster state right away + private final Collection completedNoCleanup = new ArrayList<>(); + + // Snapshots that were aborted and that already wrote data to the repository and now have to be deleted + // from the repository after the cluster state update + private final Collection completedWithCleanup = new ArrayList<>(); @Override public ClusterState execute(ClusterState currentState) { @@ -2079,25 +2085,41 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus "cannot delete snapshot while it is being cloned"); } } + // Snapshot ids that will have to be physically deleted from the repository + final Set snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds); final SnapshotsInProgress updatedSnapshots; if (minNodeVersion.onOrAfter(FULL_CONCURRENCY_VERSION)) { updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream() - .map(existing -> { + .map(existing -> { + if (existing.state() == State.STARTED && + snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) { // snapshot is started - mark every non completed shard as aborted - if (existing.state() == State.STARTED && snapshotIds.contains(existing.snapshot().getSnapshotId())) { - final SnapshotsInProgress.Entry abortedEntry = existing.abort(); - if (abortedEntry.state().completed()) { - completedSnapshots.add(abortedEntry); + final SnapshotsInProgress.Entry abortedEntry = existing.abort(); + if (abortedEntry == null) { + // No work has been done for this snapshot yet so we remove it from the cluster state directly + final Snapshot existingNotYetStartedSnapshot = existing.snapshot(); + // Adding the snapshot to #endingSnapshots since we still have to resolve its listeners to not trip + // any leaked listener assertions + if (endingSnapshots.add(existingNotYetStartedSnapshot)) { + completedNoCleanup.add(existingNotYetStartedSnapshot); } - return abortedEntry; + snapshotIdsRequiringCleanup.remove(existingNotYetStartedSnapshot.getSnapshotId()); + } else if (abortedEntry.state().completed()) { + completedWithCleanup.add(abortedEntry); } - return existing; - }).collect(Collectors.toList())); + return abortedEntry; + } + return existing; + }).filter(Objects::nonNull).collect(Collectors.toList())); + if (snapshotIdsRequiringCleanup.isEmpty()) { + // We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions + return updateWithSnapshots(currentState, updatedSnapshots, null); + } } else { if (snapshots.entries().isEmpty() == false) { // However other snapshots are running - cannot continue throw new ConcurrentSnapshotExecutionException( - repoName, snapshotIds.toString(), "another snapshot is currently running cannot delete"); + repoName, snapshotIds.toString(), "another snapshot is currently running cannot delete"); } updatedSnapshots = snapshots; } @@ -2115,9 +2137,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus reusedExistingDelete = true; return currentState; } - ensureBelowConcurrencyLimit(repoName, snapshotIds.get(0).getName(), snapshots, deletionsInProgress); + final List toDelete = Collections.unmodifiableList(new ArrayList<>(snapshotIdsRequiringCleanup)); + ensureBelowConcurrencyLimit(repoName, toDelete.get(0).getName(), snapshots, deletionsInProgress); newDelete = new SnapshotDeletionsInProgress.Entry( - snapshotIds, + toDelete, repoName, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), @@ -2127,7 +2150,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus repoName.equals(entry.repository()) && entry.state() == SnapshotDeletionsInProgress.State.STARTED) ? SnapshotDeletionsInProgress.State.STARTED : SnapshotDeletionsInProgress.State.WAITING); } else { - newDelete = replacedEntry.withAddedSnapshots(snapshotIds); + newDelete = replacedEntry.withAddedSnapshots(snapshotIdsRequiringCleanup); } return updateWithSnapshots(currentState, updatedSnapshots, (replacedEntry == null ? deletionsInProgress : deletionsInProgress.withRemovedEntry(replacedEntry.uuid())) @@ -2136,24 +2159,36 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus @Override public void onFailure(String source, Exception e) { + endingSnapshots.removeAll(completedNoCleanup); listener.onFailure(e); } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - addDeleteListener(newDelete.uuid(), listener); - if (reusedExistingDelete) { - return; + if (completedNoCleanup.isEmpty() == false) { + logger.info("snapshots {} aborted", completedNoCleanup); } - if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) { - if (tryEnterRepoLoop(repoName)) { - deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion()); - } else { - logger.trace("Delete [{}] could not execute directly and was queued", newDelete); - } + for (Snapshot snapshot : completedNoCleanup) { + failSnapshotCompletionListeners(snapshot, + new SnapshotException(snapshot, SnapshotsInProgress.ABORTED_FAILURE_TEXT)); + } + if (newDelete == null) { + listener.onResponse(null); } else { - for (SnapshotsInProgress.Entry completedSnapshot : completedSnapshots) { - endSnapshot(completedSnapshot, newState.metadata(), repositoryData); + addDeleteListener(newDelete.uuid(), listener); + if (reusedExistingDelete) { + return; + } + if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) { + if (tryEnterRepoLoop(repoName)) { + deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion()); + } else { + logger.trace("Delete [{}] could not execute directly and was queued", newDelete); + } + } else { + for (SnapshotsInProgress.Entry completedSnapshot : completedWithCleanup) { + endSnapshot(completedSnapshot, newState.metadata(), repositoryData); + } } } }