Making use of #55773 to simplify snapshot state machine. 1. Deletes with no in-progress snapshot now add the delete entry to the cluster state right away instead of doing a second CS update after the fist update was a NOOP. 2. If a bulk delete matches in-progress as well as completed snapshots, abort the in-progress snapshot and then move on to delete from the repository.
This commit is contained in:
parent
76fa5a2397
commit
e8ef44ce78
|
@ -997,11 +997,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
|
||||
/**
|
||||
* Deletes snapshots from the repository or aborts a running snapshot.
|
||||
* If deleting a single snapshot, first checks if a snapshot is still running and if so cancels the snapshot and then deletes it from
|
||||
* the repository.
|
||||
* If the snapshot is not running or multiple snapshot names are given, moves to trying to find a matching {@link Snapshot}s for the
|
||||
* given names in the repository and deletes them.
|
||||
* Deletes snapshots from the repository. In-progress snapshots matched by the delete will be aborted before deleting them.
|
||||
*
|
||||
* @param request delete snapshot request
|
||||
* @param listener listener
|
||||
|
@ -1013,39 +1009,46 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
logger.info(() -> new ParameterizedMessage("deleting snapshots [{}] from repository [{}]",
|
||||
Strings.arrayToCommaDelimitedString(snapshotNames), repositoryName));
|
||||
|
||||
clusterService.submitStateUpdateTask("delete snapshot", new ClusterStateUpdateTask(Priority.NORMAL) {
|
||||
final Repository repository = repositoriesService.repository(repositoryName);
|
||||
repository.executeConsistentStateUpdate(repositoryData -> new ClusterStateUpdateTask(Priority.NORMAL) {
|
||||
|
||||
Snapshot runningSnapshot;
|
||||
private Snapshot runningSnapshot;
|
||||
|
||||
boolean abortedDuringInit = false;
|
||||
private ClusterStateUpdateTask deleteFromRepoTask;
|
||||
|
||||
private boolean abortedDuringInit = false;
|
||||
|
||||
private List<SnapshotId> outstandingDeletes;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(ClusterState currentState) {
|
||||
public ClusterState execute(ClusterState currentState) throws Exception {
|
||||
if (snapshotNames.length > 1 && currentState.nodes().getMinNodeVersion().before(MULTI_DELETE_VERSION)) {
|
||||
throw new IllegalArgumentException("Deleting multiple snapshots in a single request is only supported in version [ "
|
||||
+ MULTI_DELETE_VERSION + "] but cluster contained node of version [" + currentState.nodes().getMinNodeVersion()
|
||||
+ "]");
|
||||
}
|
||||
final SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
final SnapshotsInProgress.Entry snapshotEntry;
|
||||
if (snapshotNames.length == 1) {
|
||||
final String snapshotName = snapshotNames[0];
|
||||
if (Regex.isSimpleMatchPattern(snapshotName)) {
|
||||
snapshotEntry = null;
|
||||
} else {
|
||||
snapshotEntry = findInProgressSnapshot(snapshots, snapshotName, repositoryName);
|
||||
}
|
||||
} else {
|
||||
snapshotEntry = null;
|
||||
}
|
||||
final SnapshotsInProgress.Entry snapshotEntry = findInProgressSnapshot(snapshots, snapshotNames, repositoryName);
|
||||
final List<SnapshotId> snapshotIds = matchingSnapshotIds(
|
||||
snapshotEntry == null ? null : snapshotEntry.snapshot().getSnapshotId(),
|
||||
repositoryData, snapshotNames, repositoryName);
|
||||
if (snapshotEntry == null) {
|
||||
return currentState;
|
||||
deleteFromRepoTask =
|
||||
createDeleteStateUpdate(snapshotIds, repositoryName, repositoryData.getGenId(), Priority.NORMAL, listener);
|
||||
return deleteFromRepoTask.execute(currentState);
|
||||
}
|
||||
|
||||
runningSnapshot = snapshotEntry.snapshot();
|
||||
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
|
||||
|
||||
final State state = snapshotEntry.state();
|
||||
final String failure;
|
||||
|
||||
outstandingDeletes = new ArrayList<>(snapshotIds);
|
||||
if (state != State.INIT) {
|
||||
// INIT state snapshots won't ever be physically written to the repository but all other states will end up in the repo
|
||||
outstandingDeletes.add(runningSnapshot.getSnapshotId());
|
||||
}
|
||||
if (state == State.INIT) {
|
||||
// snapshot is still initializing, mark it as aborted
|
||||
shards = snapshotEntry.shards();
|
||||
|
@ -1104,15 +1107,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
@Override
|
||||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
if (runningSnapshot == null) {
|
||||
try {
|
||||
repositoriesService.repository(repositoryName).executeConsistentStateUpdate(repositoryData ->
|
||||
createDeleteStateUpdate(matchingSnapshotIds(repositoryData, snapshotNames, repositoryName), repositoryName,
|
||||
repositoryData.getGenId(), request.masterNodeTimeout(), Priority.NORMAL, listener),
|
||||
"delete completed snapshots", listener::onFailure);
|
||||
} catch (RepositoryMissingException e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
if (deleteFromRepoTask != null) {
|
||||
assert outstandingDeletes == null : "Shouldn't have outstanding deletes after already starting delete task";
|
||||
deleteFromRepoTask.clusterStateProcessed(source, oldState, newState);
|
||||
return;
|
||||
}
|
||||
logger.trace("adding snapshot completion listener to wait for deleted snapshot to finish");
|
||||
|
@ -1120,13 +1117,19 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
result -> {
|
||||
logger.debug("deleted snapshot completed - deleting files");
|
||||
clusterService.submitStateUpdateTask("delete snapshot",
|
||||
createDeleteStateUpdate(Collections.singletonList(result.v2().snapshotId()), repositoryName,
|
||||
result.v1().getGenId(), null, Priority.IMMEDIATE, listener));
|
||||
createDeleteStateUpdate(outstandingDeletes, repositoryName,
|
||||
result.v1().getGenId(), Priority.IMMEDIATE, listener));
|
||||
},
|
||||
e -> {
|
||||
if (abortedDuringInit) {
|
||||
logger.info("Successfully aborted snapshot [{}]", runningSnapshot);
|
||||
listener.onResponse(null);
|
||||
if (outstandingDeletes.isEmpty()) {
|
||||
listener.onResponse(null);
|
||||
} else {
|
||||
clusterService.submitStateUpdateTask("delete snapshot",
|
||||
createDeleteStateUpdate(outstandingDeletes, repositoryName, repositoryData.getGenId(),
|
||||
Priority.IMMEDIATE, listener));
|
||||
}
|
||||
} else {
|
||||
if (ExceptionsHelper.unwrap(e, NotMasterException.class, FailedToCommitClusterStateException.class)
|
||||
!= null) {
|
||||
|
@ -1147,28 +1150,30 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
public TimeValue timeout() {
|
||||
return request.masterNodeTimeout();
|
||||
}
|
||||
});
|
||||
}, "delete snapshot", listener::onFailure);
|
||||
}
|
||||
|
||||
private static List<SnapshotId> matchingSnapshotIds(RepositoryData repositoryData, String[] snapshotsOrPatterns,
|
||||
String repositoryName) {
|
||||
private static List<SnapshotId> matchingSnapshotIds(@Nullable SnapshotId inProgress, RepositoryData repositoryData,
|
||||
String[] snapshotsOrPatterns, String repositoryName) {
|
||||
final Map<String, SnapshotId> allSnapshotIds = repositoryData.getSnapshotIds().stream().collect(
|
||||
Collectors.toMap(SnapshotId::getName, Function.identity()));
|
||||
final Set<SnapshotId> foundSnapshots = new HashSet<>();
|
||||
for (String snapshotOrPattern : snapshotsOrPatterns) {
|
||||
if (Regex.isSimpleMatchPattern(snapshotOrPattern) == false) {
|
||||
final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
|
||||
if (foundId == null) {
|
||||
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
|
||||
} else {
|
||||
foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
|
||||
}
|
||||
} else {
|
||||
if (Regex.isSimpleMatchPattern(snapshotOrPattern)) {
|
||||
for (Map.Entry<String, SnapshotId> entry : allSnapshotIds.entrySet()) {
|
||||
if (Regex.simpleMatch(snapshotOrPattern, entry.getKey())) {
|
||||
foundSnapshots.add(entry.getValue());
|
||||
}
|
||||
}
|
||||
} else {
|
||||
final SnapshotId foundId = allSnapshotIds.get(snapshotOrPattern);
|
||||
if (foundId == null) {
|
||||
if (inProgress == null || inProgress.getName().equals(snapshotOrPattern) == false) {
|
||||
throw new SnapshotMissingException(repositoryName, snapshotOrPattern);
|
||||
}
|
||||
} else {
|
||||
foundSnapshots.add(allSnapshotIds.get(snapshotOrPattern));
|
||||
}
|
||||
}
|
||||
}
|
||||
return Collections.unmodifiableList(new ArrayList<>(foundSnapshots));
|
||||
|
@ -1176,7 +1181,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
|
||||
// Return in-progress snapshot entry by name and repository in the given cluster state or null if none is found
|
||||
@Nullable
|
||||
private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String snapshotName,
|
||||
private static SnapshotsInProgress.Entry findInProgressSnapshot(@Nullable SnapshotsInProgress snapshots, String[] snapshotNames,
|
||||
String repositoryName) {
|
||||
if (snapshots == null) {
|
||||
return null;
|
||||
|
@ -1184,7 +1189,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
SnapshotsInProgress.Entry snapshotEntry = null;
|
||||
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
|
||||
if (entry.repository().equals(repositoryName)
|
||||
&& entry.snapshot().getSnapshotId().getName().equals(snapshotName)) {
|
||||
&& Regex.simpleMatch(snapshotNames, entry.snapshot().getSnapshotId().getName())) {
|
||||
snapshotEntry = entry;
|
||||
break;
|
||||
}
|
||||
|
@ -1193,7 +1198,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
|
||||
private ClusterStateUpdateTask createDeleteStateUpdate(List<SnapshotId> snapshotIds, String repoName, long repositoryStateId,
|
||||
@Nullable TimeValue timeout, Priority priority, ActionListener<Void> listener) {
|
||||
Priority priority, ActionListener<Void> listener) {
|
||||
// Short circuit to noop state update if there isn't anything to delete
|
||||
if (snapshotIds.isEmpty()) {
|
||||
return new ClusterStateUpdateTask() {
|
||||
|
@ -1211,11 +1216,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return timeout;
|
||||
}
|
||||
};
|
||||
}
|
||||
return new ClusterStateUpdateTask(priority) {
|
||||
|
|
|
@ -578,6 +578,53 @@ public class SnapshotResiliencyTests extends ESTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testBulkSnapshotDeleteWithAbort() {
|
||||
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
|
||||
|
||||
String repoName = "repo";
|
||||
String snapshotName = "snapshot";
|
||||
final String index = "test";
|
||||
final int shards = randomIntBetween(1, 10);
|
||||
|
||||
TestClusterNodes.TestClusterNode masterNode =
|
||||
testClusterNodes.currentMaster(testClusterNodes.nodes.values().iterator().next().clusterService.state());
|
||||
|
||||
final StepListener<CreateSnapshotResponse> createSnapshotResponseStepListener = new StepListener<>();
|
||||
|
||||
continueOrDie(createRepoAndIndex(repoName, index, shards),
|
||||
createIndexResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, snapshotName)
|
||||
.setWaitForCompletion(true).execute(createSnapshotResponseStepListener));
|
||||
|
||||
final StepListener<CreateSnapshotResponse> createOtherSnapshotResponseStepListener = new StepListener<>();
|
||||
|
||||
continueOrDie(createSnapshotResponseStepListener,
|
||||
createSnapshotResponse -> client().admin().cluster().prepareCreateSnapshot(repoName, "snapshot-2")
|
||||
.execute(createOtherSnapshotResponseStepListener));
|
||||
|
||||
final StepListener<AcknowledgedResponse> deleteSnapshotStepListener = new StepListener<>();
|
||||
|
||||
continueOrDie(createOtherSnapshotResponseStepListener,
|
||||
createSnapshotResponse -> client().admin().cluster().deleteSnapshot(
|
||||
new DeleteSnapshotRequest(repoName, "*"), deleteSnapshotStepListener));
|
||||
|
||||
deterministicTaskQueue.runAllRunnableTasks();
|
||||
|
||||
SnapshotsInProgress finalSnapshotsInProgress = masterNode.clusterService.state().custom(SnapshotsInProgress.TYPE);
|
||||
assertFalse(finalSnapshotsInProgress.entries().stream().anyMatch(entry -> entry.state().completed() == false));
|
||||
final Repository repository = masterNode.repositoriesService.repository(repoName);
|
||||
Collection<SnapshotId> snapshotIds = getRepositoryData(repository).getSnapshotIds();
|
||||
// No snapshots should be left in the repository
|
||||
assertThat(snapshotIds, empty());
|
||||
|
||||
for (SnapshotId snapshotId : snapshotIds) {
|
||||
final SnapshotInfo snapshotInfo = repository.getSnapshotInfo(snapshotId);
|
||||
assertEquals(SnapshotState.SUCCESS, snapshotInfo.state());
|
||||
assertThat(snapshotInfo.indices(), containsInAnyOrder(index));
|
||||
assertEquals(shards, snapshotInfo.successfulShards());
|
||||
assertEquals(0, snapshotInfo.failedShards());
|
||||
}
|
||||
}
|
||||
|
||||
public void testConcurrentSnapshotRestoreAndDeleteOther() {
|
||||
setupTestCluster(randomFrom(1, 3, 5), randomIntBetween(2, 10));
|
||||
|
||||
|
|
Loading…
Reference in New Issue