Make Snapshot Deletes Less Racy (#54765) (#55226)

Snapshot deletes should first check the cluster state for an in-progress snapshot
and try to abort it before checking the repository contents. This allows for atomically
checking and aborting a snapshot in the same cluster state update, removing all possible
races where a snapshot that is in-progress could not be found if it finishes between
checking the repository contents and the cluster state.
Also removes confusing races, where checking the cluster state off of the cluster state thread
finds an in-progress snapshot that is then not found in the cluster state update to abort it.
Finally, the logic to use the repository generation of the in-progress snapshot + 1 was error
prone because it would always fail the delete when the repository had a pending generation different from its safe generation when a snapshot started (leading to the snapshot finalizing at a
higher generation).

These issues (particularly that last point) can easily be reproduced by running `SLMSnapshotBlockingIntegTests` in a loop with current `master` (see #54766).

The snapshot resiliency test for concurrent snapshot creation and deletion was made to more
aggressively start the delete operation so that the above races would become visible.
Previously, the fact that deletes would never coincide with initializing snapshots resulted
in a number of the above races not reproducing.

This PR is the most consistent I could get snapshot deletes without changes to the state machine. The fact that aborted deletes will not put the delete operation in the cluster state before waiting for the snapshot to abort still allows for some possible (though practically very unlikely) races. These will be fixed by a state-machine change in upcoming work in #54705 (which will have a much simpler and clearer diff after this change).

Closes #54766
This commit is contained in:
Armin Braun 2020-04-15 14:47:16 +02:00 committed by GitHub
parent 70616cd76a
commit d8b43c6283
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 203 additions and 174 deletions

View File

@ -53,7 +53,7 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
@Override @Override
protected String executor() { protected String executor() {
return ThreadPool.Names.GENERIC; return ThreadPool.Names.SAME;
} }
@Override @Override
@ -71,6 +71,6 @@ public class TransportDeleteSnapshotAction extends TransportMasterNodeAction<Del
protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state, protected void masterOperation(final DeleteSnapshotRequest request, ClusterState state,
final ActionListener<AcknowledgedResponse> listener) { final ActionListener<AcknowledgedResponse> listener) {
snapshotsService.deleteSnapshot(request.repository(), request.snapshot(), snapshotsService.deleteSnapshot(request.repository(), request.snapshot(),
ActionListener.map(listener, v -> new AcknowledgedResponse(true)), false); ActionListener.map(listener, v -> new AcknowledgedResponse(true)));
} }
} }

View File

@ -26,6 +26,7 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.repositories.RepositoryData;
import org.elasticsearch.repositories.RepositoryOperation; import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.Snapshot; import org.elasticsearch.snapshots.Snapshot;
@ -174,6 +175,8 @@ public class SnapshotDeletionsInProgress extends AbstractNamedDiffable<Custom> i
this.snapshot = snapshot; this.snapshot = snapshot;
this.startTime = startTime; this.startTime = startTime;
this.repositoryStateId = repositoryStateId; this.repositoryStateId = repositoryStateId;
assert repositoryStateId > RepositoryData.EMPTY_REPO_GEN :
"Can't delete based on an empty or unknown repository generation but saw [" + repositoryStateId + "]";
} }
public Entry(StreamInput in) throws IOException { public Entry(StreamInput in) throws IOException {

View File

@ -696,22 +696,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
entries.add(updatedSnapshot); entries.add(updatedSnapshot);
} else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) { } else if (snapshot.state() == State.INIT && initializingSnapshots.contains(snapshot.snapshot()) == false) {
changed = true; changed = true;
// Mark the snapshot as aborted as it failed to start from the previous master // A snapshot in INIT state hasn't yet written anything to the repository so we simply remove it
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards()); // from the cluster state without any further cleanup
entries.add(updatedSnapshot);
// Clean up the snapshot that failed to start from the old master
deleteSnapshot(snapshot.snapshot(), new ActionListener<Void>() {
@Override
public void onResponse(Void aVoid) {
logger.debug("cleaned up abandoned snapshot {} in INIT state", snapshot.snapshot());
}
@Override
public void onFailure(Exception e) {
logger.warn("failed to clean up abandoned snapshot {} in INIT state", snapshot.snapshot());
}
}, updatedSnapshot.repositoryStateId(), false);
} }
assert updatedSnapshot.shards().size() == snapshot.shards().size() assert updatedSnapshot.shards().size() == snapshot.shards().size()
: "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]"; : "Shard count changed during snapshot status update from [" + snapshot + "] to [" + updatedSnapshot + "]";
@ -1033,61 +1019,182 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
} }
/** /**
* Deletes a snapshot from the repository, looking up the {@link Snapshot} reference before deleting. * Deletes a snapshot from the repository or aborts a running snapshot.
* If the snapshot is still running cancels the snapshot first and then deletes it from the repository. * First checks if the snapshot is still running and if so cancels the snapshot and then deletes it from the repository.
* If the snapshot is not running, moves to trying to find a matching {@link Snapshot} for the given name in the repository and if
* one is found deletes it by invoking {@link #deleteCompletedSnapshot}.
* *
* @param repositoryName repositoryName * @param repositoryName repositoryName
* @param snapshotName snapshotName * @param snapshotName snapshotName
* @param listener listener * @param listener listener
*/ */
public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener<Void> listener, public void deleteSnapshot(final String repositoryName, final String snapshotName, final ActionListener<Void> listener) {
final boolean immediatePriority) { logger.info("deleting snapshot [{}] from repository [{}]", snapshotName, repositoryName);
// First, look for the snapshot in the repository
final Repository repository = repositoriesService.repository(repositoryName);
repository.getRepositoryData(ActionListener.wrap(repositoryData -> {
Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
.stream()
.filter(s -> s.getName().equals(snapshotName))
.findFirst();
// if nothing found by the same name, then look in the cluster state for current in progress snapshots
long repoGenId = repositoryData.getGenId();
if (matchedEntry.isPresent() == false) {
Optional<SnapshotsInProgress.Entry> matchedInProgress = currentSnapshots(
clusterService.state().custom(SnapshotsInProgress.TYPE), repositoryName, Collections.emptyList()).stream()
.filter(s -> s.snapshot().getSnapshotId().getName().equals(snapshotName)).findFirst();
if (matchedInProgress.isPresent()) {
matchedEntry = matchedInProgress.map(s -> s.snapshot().getSnapshotId());
// Derive repository generation if a snapshot is in progress because it will increment the generation when it finishes
repoGenId = matchedInProgress.get().repositoryStateId() + 1L;
}
}
if (matchedEntry.isPresent() == false) {
throw new SnapshotMissingException(repositoryName, snapshotName);
}
deleteSnapshot(new Snapshot(repositoryName, matchedEntry.get()), listener, repoGenId, immediatePriority);
}, listener::onFailure));
}
/** clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) {
* Deletes snapshot from repository.
* <p>
* If the snapshot is still running cancels the snapshot first and then deletes it from the repository.
*
* @param snapshot snapshot
* @param listener listener
* @param repositoryStateId the unique id for the state of the repository
*/
private void deleteSnapshot(final Snapshot snapshot, final ActionListener<Void> listener, final long repositoryStateId,
final boolean immediatePriority) {
Priority priority = immediatePriority ? Priority.IMMEDIATE : Priority.NORMAL;
logger.info("deleting snapshot [{}] assuming repository generation [{}] and with priority [{}]",
snapshot, repositoryStateId, priority);
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
boolean waitForSnapshot = false; Snapshot runningSnapshot;
boolean abortedDuringInit = false; boolean abortedDuringInit = false;
@Override
public ClusterState execute(ClusterState currentState) {
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
final SnapshotsInProgress.Entry snapshotEntry = findInProgressSnapshot(snapshots, snapshotName, repositoryName);
if (snapshotEntry == null) {
return currentState;
}
runningSnapshot = snapshotEntry.snapshot();
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
final State state = snapshotEntry.state();
final String failure;
if (state == State.INIT) {
// snapshot is still initializing, mark it as aborted
shards = snapshotEntry.shards();
assert shards.isEmpty();
failure = "Snapshot was aborted during initialization";
abortedDuringInit = true;
} else if (state == State.STARTED) {
// snapshot is started - mark every non completed shard as aborted
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (status.state().completed() == false) {
status = new ShardSnapshotStatus(
status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion", status.generation());
}
shardsBuilder.put(shardEntry.key, status);
}
shards = shardsBuilder.build();
failure = "Snapshot was aborted by deletion";
} else {
boolean hasUncompletedShards = false;
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
for (ObjectCursor<ShardSnapshotStatus> shardStatus : snapshotEntry.shards().values()) {
// Check if we still have shard running on existing nodes
if (shardStatus.value.state().completed() == false && shardStatus.value.nodeId() != null
&& currentState.nodes().get(shardStatus.value.nodeId()) != null) {
hasUncompletedShards = true;
break;
}
}
if (hasUncompletedShards) {
// snapshot is being finalized - wait for shards to complete finalization process
logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes");
return currentState;
} else {
// no shards to wait for but a node is gone - this is the only case
// where we force to finish the snapshot
logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately");
shards = snapshotEntry.shards();
}
failure = snapshotEntry.failure();
}
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
new SnapshotsInProgress(snapshots.entries().stream().map(existing -> {
if (existing.equals(snapshotEntry)) {
return new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure);
}
return existing;
}).collect(Collectors.toList()))).build();
}
@Override
public void onFailure(String source, Exception e) {
listener.onFailure(e);
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (runningSnapshot == null) {
tryDeleteExisting(Priority.NORMAL);
return;
}
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
addListener(runningSnapshot, ActionListener.wrap(
snapshotInfo -> {
logger.debug("deleted snapshot completed - deleting files");
tryDeleteExisting(Priority.IMMEDIATE);
},
e -> {
if (abortedDuringInit) {
logger.info("Successfully aborted snapshot [{}]", runningSnapshot);
listener.onResponse(null);
} else {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class)
!= null) {
logger.warn("master failover before deleted snapshot could complete", e);
// Just pass the exception to the transport handler as is so it is retried on the new master
listener.onFailure(e);
} else {
logger.warn("deleted snapshot failed", e);
listener.onFailure(
new SnapshotMissingException(runningSnapshot.getRepository(), runningSnapshot.getSnapshotId(), e));
}
}
}
));
}
private void tryDeleteExisting(Priority priority) {
threadPool.generic().execute(ActionRunnable.wrap(listener, l ->
repositoriesService.repository(repositoryName).getRepositoryData(ActionListener.wrap(repositoryData -> {
Optional<SnapshotId> matchedEntry = repositoryData.getSnapshotIds()
.stream()
.filter(s -> s.getName().equals(snapshotName))
.findFirst();
// If we can't find the snapshot by the given name in the repository at all or if the snapshot we find in the
// repository is not the one we expected to find when waiting for a finishing snapshot we fail.
// Note: Not finding a snapshot we expected to find is practically impossible as it would imply that the snapshot
// we waited for was concurrently deleted and another snapshot by the same name concurrently created
// during the context switch from the cluster state thread to the snapshot thread. We still guard against the
// possibility as a safety measure.
if (matchedEntry.isPresent() == false
|| (runningSnapshot != null && matchedEntry.get().equals(runningSnapshot.getSnapshotId()) == false)) {
if (runningSnapshot != null && matchedEntry.isPresent()) {
logger.warn("Waited for snapshot [{}}] but found snapshot [{}] in repository [{}]",
runningSnapshot.getSnapshotId(), matchedEntry.get(), repositoryName);
}
l.onFailure(new SnapshotMissingException(repositoryName, snapshotName));
} else {
deleteCompletedSnapshot(
new Snapshot(repositoryName, matchedEntry.get()), repositoryData.getGenId(), priority, l);
}
}, l::onFailure))));
}
});
}
// Return in-progress snapshot entry by name and repository in the given cluster state or null if none is found
@Nullable
private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String snapshotName,
String repositoryName) {
if (snapshots == null) {
return null;
}
SnapshotsInProgress.Entry snapshotEntry = null;
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
if (entry.repository().equals(repositoryName)
&& entry.snapshot().getSnapshotId().getName().equals(snapshotName)) {
snapshotEntry = entry;
break;
}
}
return snapshotEntry;
}
/**
* Deletes a snapshot that is assumed to be in the repository and not tracked as in-progress in the cluster state.
*
* @param snapshot Snapshot to delete
* @param repositoryStateId Repository generation to base the delete on
* @param listener Listener to complete when done
*/
private void deleteCompletedSnapshot(Snapshot snapshot, long repositoryStateId, Priority priority, ActionListener<Void> listener) {
logger.debug("deleting snapshot [{}] assuming repository generation [{}] and with priority [{}]", snapshot, repositoryStateId,
priority);
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(priority) {
@Override @Override
public ClusterState execute(ClusterState currentState) { public ClusterState execute(ClusterState currentState) {
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE); SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(SnapshotDeletionsInProgress.TYPE);
@ -1113,81 +1220,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
} }
} }
} }
ClusterState.Builder clusterStateBuilder = ClusterState.builder(currentState);
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE); SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
SnapshotsInProgress.Entry snapshotEntry = snapshots != null ? snapshots.snapshot(snapshot) : null; if (snapshots != null && snapshots.entries().isEmpty() == false) {
if (snapshotEntry == null) { // However other snapshots are running - cannot continue
// This snapshot is not running - delete throw new ConcurrentSnapshotExecutionException(snapshot, "another snapshot is currently running cannot delete");
if (snapshots != null && !snapshots.entries().isEmpty()) {
// However other snapshots are running - cannot continue
throw new ConcurrentSnapshotExecutionException(snapshot, "another snapshot is currently running cannot delete");
}
// add the snapshot deletion to the cluster state
SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry(
snapshot,
threadPool.absoluteTimeInMillis(),
repositoryStateId
);
if (deletionsInProgress != null) {
deletionsInProgress = deletionsInProgress.withAddedEntry(entry);
} else {
deletionsInProgress = SnapshotDeletionsInProgress.newInstance(entry);
}
clusterStateBuilder.putCustom(SnapshotDeletionsInProgress.TYPE, deletionsInProgress);
} else {
// This snapshot is currently running - stopping shards first
waitForSnapshot = true;
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
final State state = snapshotEntry.state();
final String failure;
if (state == State.INIT) {
// snapshot is still initializing, mark it as aborted
shards = snapshotEntry.shards();
assert shards.isEmpty();
failure = "Snapshot was aborted during initialization";
abortedDuringInit = true;
} else if (state == State.STARTED) {
// snapshot is started - mark every non completed shard as aborted
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (status.state().completed() == false) {
status = new ShardSnapshotStatus(
status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion", status.generation());
}
shardsBuilder.put(shardEntry.key, status);
}
shards = shardsBuilder.build();
failure = "Snapshot was aborted by deletion";
} else {
boolean hasUncompletedShards = false;
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
for (ObjectCursor<ShardSnapshotStatus> shardStatus : snapshotEntry.shards().values()) {
// Check if we still have shard running on existing nodes
if (shardStatus.value.state().completed() == false && shardStatus.value.nodeId() != null
&& currentState.nodes().get(shardStatus.value.nodeId()) != null) {
hasUncompletedShards = true;
break;
}
}
if (hasUncompletedShards) {
// snapshot is being finalized - wait for shards to complete finalization process
logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes");
return currentState;
} else {
// no shards to wait for but a node is gone - this is the only case
// where we force to finish the snapshot
logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately");
shards = snapshotEntry.shards();
}
failure = snapshotEntry.failure();
}
SnapshotsInProgress.Entry newSnapshot = new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure);
clusterStateBuilder.putCustom(SnapshotsInProgress.TYPE, new SnapshotsInProgress(newSnapshot));
} }
return clusterStateBuilder.build(); // add the snapshot deletion to the cluster state
SnapshotDeletionsInProgress.Entry entry = new SnapshotDeletionsInProgress.Entry(
snapshot,
threadPool.absoluteTimeInMillis(),
repositoryStateId
);
if (deletionsInProgress != null) {
deletionsInProgress = deletionsInProgress.withAddedEntry(entry);
} else {
deletionsInProgress = SnapshotDeletionsInProgress.newInstance(entry);
}
return ClusterState.builder(currentState).putCustom(SnapshotDeletionsInProgress.TYPE, deletionsInProgress).build();
} }
@Override @Override
@ -1197,42 +1246,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
@Override @Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (waitForSnapshot) { deleteSnapshotFromRepository(snapshot, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
addListener(snapshot, ActionListener.wrap(
snapshotInfo -> {
logger.debug("deleted snapshot completed - deleting files");
threadPool.executor(ThreadPool.Names.SNAPSHOT).execute(() -> {
try {
deleteSnapshot(snapshot.getRepository(), snapshot.getSnapshotId().getName(), listener, true);
} catch (Exception ex) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to delete snapshot", snapshot), ex);
}
}
);
},
e -> {
if (abortedDuringInit) {
logger.debug(() -> new ParameterizedMessage("Snapshot [{}] was aborted during INIT", snapshot), e);
listener.onResponse(null);
} else {
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class)
!= null) {
logger.warn("master failover before deleted snapshot could complete", e);
// Just pass the exception to the transport handler as is so it is retried on the new master
listener.onFailure(e);
} else {
logger.warn("deleted snapshot failed", e);
listener.onFailure(
new SnapshotMissingException(snapshot.getRepository(), snapshot.getSnapshotId(), e));
}
}
}
));
} else {
logger.debug("deleted snapshot is not running - deleting files");
deleteSnapshotFromRepository(snapshot, listener, repositoryStateId, newState.nodes().getMinNodeVersion());
}
} }
}); });
} }
@ -1352,6 +1366,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (failure != null) { if (failure != null) {
listener.onFailure(failure); listener.onFailure(failure);
} else { } else {
logger.info("Successfully deleted snapshot [{}]", snapshot);
listener.onResponse(null); listener.onResponse(null);
} }
} }

View File

@ -89,9 +89,11 @@ import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.update.UpdateHelper; import org.elasticsearch.action.update.UpdateHelper;
import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.node.NodeClient; import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterModule; import org.elasticsearch.cluster.ClusterModule;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ESAllocationTestCase; import org.elasticsearch.cluster.ESAllocationTestCase;
import org.elasticsearch.cluster.NodeConnectionsService; import org.elasticsearch.cluster.NodeConnectionsService;
import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress;
@ -428,8 +430,16 @@ public class SnapshotResiliencyTests extends ESTestCase {
final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>(); final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
continueOrDie(createSnapshotResponseStepListener, createSnapshotResponse -> client().admin().cluster().deleteSnapshot( masterNode.clusterService.addListener(new ClusterStateListener() {
new DeleteSnapshotRequest(repoName, snapshotName), deleteSnapshotStepListener)); @Override
public void clusterChanged(ClusterChangedEvent event) {
final SnapshotsInProgress snapshotsInProgress = event.state().custom(SnapshotsInProgress.TYPE);
if (snapshotsInProgress != null && snapshotsInProgress.entries().isEmpty() == false) {
client().admin().cluster().prepareDeleteSnapshot(repoName, snapshotName).execute(deleteSnapshotStepListener);
masterNode.clusterService.removeListener(this);
}
}
});
final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>(); final StepListener<CreateSnapshotResponse> createAnotherSnapshotResponseStepListener = new StepListener<>();
@ -440,6 +450,7 @@ public class SnapshotResiliencyTests extends ESTestCase {
deterministicTaskQueue.runAllRunnableTasks(); deterministicTaskQueue.runAllRunnableTasks();
assertNotNull(createSnapshotResponseStepListener.result());
assertNotNull(createAnotherSnapshotResponseStepListener.result()); assertNotNull(createAnotherSnapshotResponseStepListener.result());
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE); SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false)); assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));