Avoid concurrent snapshot finalizations when deleting an INIT snapshot (#28078)
This commit removes the finalization of a snapshot by the snapshot deletion request. This way, the deletion marks the snapshot as ABORTED in cluster state and waits for the snapshot completion. It is the responsability of the snapshot execution to detect the abortion and terminates itself correctly. This avoids concurrent snapshot finalizations and also ordinates the operations: the deletion aborts the snapshot and waits for the snapshot completion, the creation detects the abortion and stops by itself and finalizes the snapshot, then the deletion resumes and continues the deletion process.
This commit is contained in:
parent
fd45a46ce8
commit
04ce0e7625
|
@ -372,8 +372,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
return;
|
||||
}
|
||||
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
|
||||
boolean accepted = false;
|
||||
SnapshotsInProgress.Entry updatedSnapshot;
|
||||
|
||||
SnapshotsInProgress.Entry endSnapshot;
|
||||
String failure = null;
|
||||
|
||||
@Override
|
||||
|
@ -381,17 +381,23 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
SnapshotsInProgress snapshots = currentState.custom(SnapshotsInProgress.TYPE);
|
||||
List<SnapshotsInProgress.Entry> entries = new ArrayList<>();
|
||||
for (SnapshotsInProgress.Entry entry : snapshots.entries()) {
|
||||
if (entry.snapshot().equals(snapshot.snapshot()) && entry.state() != State.ABORTED) {
|
||||
// Replace the snapshot that was just created
|
||||
if (entry.snapshot().equals(snapshot.snapshot()) == false) {
|
||||
entries.add(entry);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (entry.state() != State.ABORTED) {
|
||||
// Replace the snapshot that was just intialized
|
||||
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = shards(currentState, entry.indices());
|
||||
if (!partial) {
|
||||
Tuple<Set<String>, Set<String>> indicesWithMissingShards = indicesWithMissingShards(shards, currentState.metaData());
|
||||
Set<String> missing = indicesWithMissingShards.v1();
|
||||
Set<String> closed = indicesWithMissingShards.v2();
|
||||
if (missing.isEmpty() == false || closed.isEmpty() == false) {
|
||||
StringBuilder failureMessage = new StringBuilder();
|
||||
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
|
||||
entries.add(updatedSnapshot);
|
||||
endSnapshot = new SnapshotsInProgress.Entry(entry, State.FAILED, shards);
|
||||
entries.add(endSnapshot);
|
||||
|
||||
final StringBuilder failureMessage = new StringBuilder();
|
||||
if (missing.isEmpty() == false) {
|
||||
failureMessage.append("Indices don't have primary shards ");
|
||||
failureMessage.append(missing);
|
||||
|
@ -407,13 +413,16 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
continue;
|
||||
}
|
||||
}
|
||||
updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
|
||||
SnapshotsInProgress.Entry updatedSnapshot = new SnapshotsInProgress.Entry(entry, State.STARTED, shards);
|
||||
entries.add(updatedSnapshot);
|
||||
if (!completed(shards.values())) {
|
||||
accepted = true;
|
||||
if (completed(shards.values())) {
|
||||
endSnapshot = updatedSnapshot;
|
||||
}
|
||||
} else {
|
||||
entries.add(entry);
|
||||
assert entry.state() == State.ABORTED : "expecting snapshot to be aborted during initialization";
|
||||
failure = "snapshot was aborted during initialization";
|
||||
endSnapshot = entry;
|
||||
entries.add(endSnapshot);
|
||||
}
|
||||
}
|
||||
return ClusterState.builder(currentState)
|
||||
|
@ -448,8 +457,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
// We should end snapshot only if 1) we didn't accept it for processing (which happens when there
|
||||
// is nothing to do) and 2) there was a snapshot in metadata that we should end. Otherwise we should
|
||||
// go ahead and continue working on this snapshot rather then end here.
|
||||
if (!accepted && updatedSnapshot != null) {
|
||||
endSnapshot(updatedSnapshot, failure);
|
||||
if (endSnapshot != null) {
|
||||
endSnapshot(endSnapshot, failure);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
@ -750,6 +759,11 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
}
|
||||
entries.add(updatedSnapshot);
|
||||
} else if (snapshot.state() == State.INIT && newMaster) {
|
||||
changed = true;
|
||||
// Mark the snapshot as aborted as it failed to start from the previous master
|
||||
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.ABORTED, snapshot.shards());
|
||||
entries.add(updatedSnapshot);
|
||||
|
||||
// Clean up the snapshot that failed to start from the old master
|
||||
deleteSnapshot(snapshot.snapshot(), new DeleteSnapshotListener() {
|
||||
@Override
|
||||
|
@ -935,7 +949,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
*
|
||||
* @param entry snapshot
|
||||
*/
|
||||
void endSnapshot(SnapshotsInProgress.Entry entry) {
|
||||
void endSnapshot(final SnapshotsInProgress.Entry entry) {
|
||||
endSnapshot(entry, null);
|
||||
}
|
||||
|
||||
|
@ -1144,24 +1158,26 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
} else {
|
||||
// This snapshot is currently running - stopping shards first
|
||||
waitForSnapshot = true;
|
||||
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
|
||||
if (snapshotEntry.state() == State.STARTED && snapshotEntry.shards() != null) {
|
||||
// snapshot is currently running - stop started shards
|
||||
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
|
||||
|
||||
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards;
|
||||
|
||||
final State state = snapshotEntry.state();
|
||||
if (state == State.INIT) {
|
||||
// snapshot is still initializing, mark it as aborted
|
||||
shards = snapshotEntry.shards();
|
||||
|
||||
} else if (state == State.STARTED) {
|
||||
// snapshot is started - mark every non completed shard as aborted
|
||||
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
|
||||
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
|
||||
ShardSnapshotStatus status = shardEntry.value;
|
||||
if (!status.state().completed()) {
|
||||
shardsBuilder.put(shardEntry.key, new ShardSnapshotStatus(status.nodeId(), State.ABORTED,
|
||||
"aborted by snapshot deletion"));
|
||||
} else {
|
||||
shardsBuilder.put(shardEntry.key, status);
|
||||
if (status.state().completed() == false) {
|
||||
status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion");
|
||||
}
|
||||
shardsBuilder.put(shardEntry.key, status);
|
||||
}
|
||||
shards = shardsBuilder.build();
|
||||
} else if (snapshotEntry.state() == State.INIT) {
|
||||
// snapshot hasn't started yet - end it
|
||||
shards = snapshotEntry.shards();
|
||||
endSnapshot(snapshotEntry);
|
||||
|
||||
} else {
|
||||
boolean hasUncompletedShards = false;
|
||||
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
|
||||
|
@ -1178,7 +1194,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
|
|||
logger.debug("trying to delete completed snapshot - should wait for shards to finalize on all nodes");
|
||||
return currentState;
|
||||
} else {
|
||||
// no shards to wait for - finish the snapshot
|
||||
// no shards to wait for but a node is gone - this is the only case
|
||||
// where we force to finish the snapshot
|
||||
logger.debug("trying to delete completed snapshot with no finalizing shards - can delete immediately");
|
||||
shards = snapshotEntry.shards();
|
||||
endSnapshot(snapshotEntry);
|
||||
|
|
|
@ -3151,7 +3151,7 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertThat(shardStats.getSeqNoStats().getMaxSeqNo(), equalTo(15L));
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/27974")
|
||||
@TestLogging("org.elasticsearch.snapshots:TRACE")
|
||||
public void testAbortedSnapshotDuringInitDoesNotStart() throws Exception {
|
||||
final Client client = client();
|
||||
|
||||
|
@ -3163,11 +3163,11 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
));
|
||||
|
||||
createIndex("test-idx");
|
||||
final int nbDocs = scaledRandomIntBetween(1, 100);
|
||||
final int nbDocs = scaledRandomIntBetween(100, 500);
|
||||
for (int i = 0; i < nbDocs; i++) {
|
||||
index("test-idx", "_doc", Integer.toString(i), "foo", "bar" + i);
|
||||
}
|
||||
refresh();
|
||||
flushAndRefresh("test-idx");
|
||||
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().getTotalHits(), equalTo((long) nbDocs));
|
||||
|
||||
// Create a snapshot
|
||||
|
|
Loading…
Reference in New Issue