There is no need to let snapshots that haven't yet written anything to the repo finalize with `FAILED`. When we still had the `INIT` state we would also just remove these snapshots from the state without any further action. This is not just a theoretical optimization. Currently, the situation of having a lot of queued up snapshots is fairly complicated to resolve when all the queued shards move to aborted since it is now necessary to execute tasks on the `SNAPSHOT` pool (that might be very busy) to remove the snapshot from the CS (including a number of redundant CS updates and repo writes for finalizing these snapshots before deleting them right away after).
This commit is contained in:
parent
25fbc01459
commit
d7f6812d78
|
@ -547,7 +547,8 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
// Second delete works out cleanly since the repo is unblocked now
|
||||
assertThat(secondDeleteFuture.get().isAcknowledged(), is(true));
|
||||
// Snapshot should have been aborted
|
||||
assertThat(snapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.FAILED));
|
||||
final SnapshotException snapshotException = expectThrows(SnapshotException.class, snapshotFuture::actionGet);
|
||||
assertThat(snapshotException.getMessage(), containsString(SnapshotsInProgress.ABORTED_FAILURE_TEXT));
|
||||
|
||||
assertThat(client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(), empty());
|
||||
}
|
||||
|
@ -573,7 +574,8 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
// Second delete works out cleanly since the repo is unblocked now
|
||||
assertThat(secondDeleteFuture.get().isAcknowledged(), is(true));
|
||||
// Snapshot should have been aborted
|
||||
assertThat(snapshotFuture.get().getSnapshotInfo().state(), is(SnapshotState.FAILED));
|
||||
final SnapshotException snapshotException = expectThrows(SnapshotException.class, snapshotFuture::actionGet);
|
||||
assertThat(snapshotException.getMessage(), containsString(SnapshotsInProgress.ABORTED_FAILURE_TEXT));
|
||||
|
||||
assertThat(client().admin().cluster().prepareGetSnapshots(repoName).get().getSnapshots(), empty());
|
||||
}
|
||||
|
@ -1223,6 +1225,30 @@ public class ConcurrentSnapshotsIT extends AbstractSnapshotIntegTestCase {
|
|||
assertThat(sne.getCause().getMessage(), containsString("exception after block"));
|
||||
}
|
||||
|
||||
public void testAbortNotStartedSnapshotWithoutIO() throws Exception {
|
||||
internalCluster().startMasterOnlyNode();
|
||||
final String dataNode = internalCluster().startDataOnlyNode();
|
||||
final String repoName = "test-repo";
|
||||
createRepository(repoName, "mock");
|
||||
createIndexWithContent("test-index");
|
||||
|
||||
final ActionFuture<CreateSnapshotResponse> createSnapshot1Future =
|
||||
startFullSnapshotBlockedOnDataNode("first-snapshot", repoName, dataNode);
|
||||
|
||||
final String snapshotTwo = "second-snapshot";
|
||||
final ActionFuture<CreateSnapshotResponse> createSnapshot2Future = startFullSnapshot(repoName, snapshotTwo);
|
||||
|
||||
awaitNumberOfSnapshotsInProgress(2);
|
||||
|
||||
assertAcked(startDeleteSnapshot(repoName, snapshotTwo).get());
|
||||
final SnapshotException sne = expectThrows(SnapshotException.class, createSnapshot2Future::actionGet);
|
||||
|
||||
assertFalse(createSnapshot1Future.isDone());
|
||||
unblockNode(repoName, dataNode);
|
||||
assertSuccessful(createSnapshot1Future);
|
||||
assertThat(getRepositoryData(repoName).getGenId(), is(0L));
|
||||
}
|
||||
|
||||
private static String startDataNodeWithLargeSnapshotPool() {
|
||||
return internalCluster().startDataOnlyNode(LARGE_SNAPSHOT_POOL_SETTINGS);
|
||||
}
|
||||
|
|
|
@ -66,6 +66,8 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
|
||||
public static final String TYPE = "snapshots";
|
||||
|
||||
public static final String ABORTED_FAILURE_TEXT = "Snapshot was aborted by deletion";
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
@ -291,14 +293,19 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
* data node or to {@link ShardState#FAILED} if not assigned to any data node.
|
||||
* If the instance had no in-progress shard snapshots assigned to data nodes it's moved to state {@link State#SUCCESS}, otherwise
|
||||
* it's moved to state {@link State#ABORTED}.
|
||||
* In the special case where this instance has not yet made any progress on any shard this method just returns
|
||||
* {@code null} since no abort is needed and the snapshot can simply be removed from the cluster state outright.
|
||||
*
|
||||
* @return aborted snapshot entry
|
||||
* @return aborted snapshot entry or {@code null} if entry can be removed from the cluster state directly
|
||||
*/
|
||||
@Nullable
|
||||
public Entry abort() {
|
||||
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
|
||||
boolean completed = true;
|
||||
boolean allQueued = true;
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : shards) {
|
||||
ShardSnapshotStatus status = shardEntry.value;
|
||||
allQueued &= status.state() == ShardState.QUEUED;
|
||||
if (status.state().completed() == false) {
|
||||
final String nodeId = status.nodeId();
|
||||
status = new ShardSnapshotStatus(nodeId, nodeId == null ? ShardState.FAILED : ShardState.ABORTED,
|
||||
|
@ -307,7 +314,10 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
|
|||
completed &= status.state().completed();
|
||||
shardsBuilder.put(shardEntry.key, status);
|
||||
}
|
||||
return fail(shardsBuilder.build(), completed ? State.SUCCESS : State.ABORTED, "Snapshot was aborted by deletion");
|
||||
if (allQueued) {
|
||||
return null;
|
||||
}
|
||||
return fail(shardsBuilder.build(), completed ? State.SUCCESS : State.ABORTED, ABORTED_FAILURE_TEXT);
|
||||
}
|
||||
|
||||
public Entry fail(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, State state, String failure) {
|
||||
|
|
|
@ -2037,7 +2037,13 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
private boolean reusedExistingDelete = false;
|
||||
|
||||
private final Collection<SnapshotsInProgress.Entry> completedSnapshots = new ArrayList<>();
|
||||
// Snapshots that had all of their shard snapshots in queued state and thus were removed from the
|
||||
// cluster state right away
|
||||
private final Collection<Snapshot> completedNoCleanup = new ArrayList<>();
|
||||
|
||||
// Snapshots that were aborted and that already wrote data to the repository and now have to be deleted
|
||||
// from the repository after the cluster state update
|
||||
private final Collection<SnapshotsInProgress.Entry> completedWithCleanup = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
|
@ -2079,25 +2085,41 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
"cannot delete snapshot while it is being cloned");
|
||||
}
|
||||
}
|
||||
// Snapshot ids that will have to be physically deleted from the repository
|
||||
final Set<SnapshotId> snapshotIdsRequiringCleanup = new HashSet<>(snapshotIds);
|
||||
final SnapshotsInProgress updatedSnapshots;
|
||||
if (minNodeVersion.onOrAfter(FULL_CONCURRENCY_VERSION)) {
|
||||
updatedSnapshots = SnapshotsInProgress.of(snapshots.entries().stream()
|
||||
.map(existing -> {
|
||||
.map(existing -> {
|
||||
if (existing.state() == State.STARTED &&
|
||||
snapshotIdsRequiringCleanup.contains(existing.snapshot().getSnapshotId())) {
|
||||
// snapshot is started - mark every non completed shard as aborted
|
||||
if (existing.state() == State.STARTED && snapshotIds.contains(existing.snapshot().getSnapshotId())) {
|
||||
final SnapshotsInProgress.Entry abortedEntry = existing.abort();
|
||||
if (abortedEntry.state().completed()) {
|
||||
completedSnapshots.add(abortedEntry);
|
||||
final SnapshotsInProgress.Entry abortedEntry = existing.abort();
|
||||
if (abortedEntry == null) {
|
||||
// No work has been done for this snapshot yet so we remove it from the cluster state directly
|
||||
final Snapshot existingNotYetStartedSnapshot = existing.snapshot();
|
||||
// Adding the snapshot to #endingSnapshots since we still have to resolve its listeners to not trip
|
||||
// any leaked listener assertions
|
||||
if (endingSnapshots.add(existingNotYetStartedSnapshot)) {
|
||||
completedNoCleanup.add(existingNotYetStartedSnapshot);
|
||||
}
|
||||
return abortedEntry;
|
||||
snapshotIdsRequiringCleanup.remove(existingNotYetStartedSnapshot.getSnapshotId());
|
||||
} else if (abortedEntry.state().completed()) {
|
||||
completedWithCleanup.add(abortedEntry);
|
||||
}
|
||||
return existing;
|
||||
}).collect(Collectors.toList()));
|
||||
return abortedEntry;
|
||||
}
|
||||
return existing;
|
||||
}).filter(Objects::nonNull).collect(Collectors.toList()));
|
||||
if (snapshotIdsRequiringCleanup.isEmpty()) {
|
||||
// We only saw snapshots that could be removed from the cluster state right away, no need to update the deletions
|
||||
return updateWithSnapshots(currentState, updatedSnapshots, null);
|
||||
}
|
||||
} else {
|
||||
if (snapshots.entries().isEmpty() == false) {
|
||||
// However other snapshots are running - cannot continue
|
||||
throw new ConcurrentSnapshotExecutionException(
|
||||
repoName, snapshotIds.toString(), "another snapshot is currently running cannot delete");
|
||||
repoName, snapshotIds.toString(), "another snapshot is currently running cannot delete");
|
||||
}
|
||||
updatedSnapshots = snapshots;
|
||||
}
|
||||
|
@ -2115,9 +2137,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
reusedExistingDelete = true;
|
||||
return currentState;
|
||||
}
|
||||
ensureBelowConcurrencyLimit(repoName, snapshotIds.get(0).getName(), snapshots, deletionsInProgress);
|
||||
final List<SnapshotId> toDelete = Collections.unmodifiableList(new ArrayList<>(snapshotIdsRequiringCleanup));
|
||||
ensureBelowConcurrencyLimit(repoName, toDelete.get(0).getName(), snapshots, deletionsInProgress);
|
||||
newDelete = new SnapshotDeletionsInProgress.Entry(
|
||||
snapshotIds,
|
||||
toDelete,
|
||||
repoName,
|
||||
threadPool.absoluteTimeInMillis(),
|
||||
repositoryData.getGenId(),
|
||||
|
@ -2127,7 +2150,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
repoName.equals(entry.repository()) && entry.state() == SnapshotDeletionsInProgress.State.STARTED)
|
||||
? SnapshotDeletionsInProgress.State.STARTED : SnapshotDeletionsInProgress.State.WAITING);
|
||||
} else {
|
||||
newDelete = replacedEntry.withAddedSnapshots(snapshotIds);
|
||||
newDelete = replacedEntry.withAddedSnapshots(snapshotIdsRequiringCleanup);
|
||||
}
|
||||
return updateWithSnapshots(currentState, updatedSnapshots,
|
||||
(replacedEntry == null ? deletionsInProgress : deletionsInProgress.withRemovedEntry(replacedEntry.uuid()))
|
||||
|
@ -2136,24 +2159,36 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
public void onFailure(String source, Exception e) {
|
||||
endingSnapshots.removeAll(completedNoCleanup);
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
addDeleteListener(newDelete.uuid(), listener);
|
||||
if (reusedExistingDelete) {
|
||||
return;
|
||||
if (completedNoCleanup.isEmpty() == false) {
|
||||
logger.info("snapshots {} aborted", completedNoCleanup);
|
||||
}
|
||||
if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) {
|
||||
if (tryEnterRepoLoop(repoName)) {
|
||||
deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion());
|
||||
} else {
|
||||
logger.trace("Delete [{}] could not execute directly and was queued", newDelete);
|
||||
}
|
||||
for (Snapshot snapshot : completedNoCleanup) {
|
||||
failSnapshotCompletionListeners(snapshot,
|
||||
new SnapshotException(snapshot, SnapshotsInProgress.ABORTED_FAILURE_TEXT));
|
||||
}
|
||||
if (newDelete == null) {
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
for (SnapshotsInProgress.Entry completedSnapshot : completedSnapshots) {
|
||||
endSnapshot(completedSnapshot, newState.metadata(), repositoryData);
|
||||
addDeleteListener(newDelete.uuid(), listener);
|
||||
if (reusedExistingDelete) {
|
||||
return;
|
||||
}
|
||||
if (newDelete.state() == SnapshotDeletionsInProgress.State.STARTED) {
|
||||
if (tryEnterRepoLoop(repoName)) {
|
||||
deleteSnapshotsFromRepository(newDelete, repositoryData, newState.nodes().getMinNodeVersion());
|
||||
} else {
|
||||
logger.trace("Delete [{}] could not execute directly and was queued", newDelete);
|
||||
}
|
||||
} else {
|
||||
for (SnapshotsInProgress.Entry completedSnapshot : completedWithCleanup) {
|
||||
endSnapshot(completedSnapshot, newState.metadata(), repositoryData);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue