Refactor SnapshotsInProgress State Transitions (#60517) (#63266)

The copy constructors previously used were hard to read and the exact state changes
were not obvious at all.
Refactored those into a number of named constructors instead, added additional assertions
and moved the snapshot abort logic into `SnapshotsInProgress`.
This commit is contained in:
Armin Braun 2020-10-06 00:03:42 +02:00 committed by GitHub
parent 860791260d
commit e91936512a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 81 additions and 73 deletions

View File

@ -57,6 +57,7 @@ import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.snapshots.SnapshotInfoTests;
import org.elasticsearch.snapshots.SnapshotsInProgressSerializationTests;
import org.elasticsearch.test.ESIntegTestCase;
import java.util.Collections;
@ -716,11 +717,13 @@ public class ClusterStateDiffIT extends ESIntegTestCase {
new Snapshot(randomName("repo"), new SnapshotId(randomName("snap"), UUIDs.randomBase64UUID())),
randomBoolean(),
randomBoolean(),
randomFrom(SnapshotsInProgress.State.values()),
SnapshotsInProgressSerializationTests.randomState(ImmutableOpenMap.of()),
Collections.emptyList(),
Collections.emptyList(),
Math.abs(randomLong()),
(long) randomIntBetween(0, 1000),
randomIntBetween(0, 1000),
ImmutableOpenMap.of(),
null,
SnapshotInfoTests.randomUserMetadata(),
randomVersion(random()))));
case 1:

View File

@ -88,6 +88,19 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return builder.append("]").toString();
}
/**
* Creates the initial {@link Entry} when starting a snapshot, if no shard-level snapshot work is to be done the resulting entry
* will be in state {@link State#SUCCESS} right away otherwise it will be in state {@link State#STARTED}.
*/
public static Entry startedEntry(Snapshot snapshot, boolean includeGlobalState, boolean partial, List<IndexId> indices,
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, Map<String, Object> userMetadata,
Version version) {
return new SnapshotsInProgress.Entry(snapshot, includeGlobalState, partial,
completed(shards.values()) ? State.SUCCESS : State.STARTED,
indices, dataStreams, startTime, repositoryStateId, shards, null, userMetadata, version);
}
public static class Entry implements Writeable, ToXContent, RepositoryOperation {
private final State state;
private final Snapshot snapshot;
@ -103,6 +116,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
@Nullable private final Map<String, Object> userMetadata;
@Nullable private final String failure;
// visible for testing, use #startedEntry and copy constructors in production code
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
List<String> dataStreams, long startTime, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure, Map<String, Object> userMetadata,
@ -164,6 +178,9 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
shards.keysIt().forEachRemaining(s -> indexNamesInShards.add(s.getIndexName()));
assert indexNames.equals(indexNamesInShards)
: "Indices in shards " + indexNamesInShards + " differ from expected indices " + indexNames + " for state [" + state + "]";
final boolean shardsCompleted = completed(shards.values());
assert (state.completed() && shardsCompleted) || (state.completed() == false && shardsCompleted == false)
: "Completed state must imply all shards completed but saw state [" + state + "] and shards " + shards;
return true;
}
@ -174,33 +191,12 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
null, userMetadata, version);
}
public Entry(Snapshot snapshot, boolean includeGlobalState, boolean partial, State state, List<IndexId> indices,
long startTime, long repositoryStateId, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards,
Map<String, Object> userMetadata, Version version) {
this(snapshot, includeGlobalState, partial, state, indices, Collections.emptyList(), startTime, repositoryStateId, shards,
null, userMetadata, version);
}
public Entry(Entry entry, State state, List<IndexId> indices, long repositoryStateId,
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, Version version, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, indices, entry.dataStreams, entry.startTime,
repositoryStateId, shards, failure, entry.userMetadata, version);
}
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.dataStreams, entry.startTime,
entry.repositoryStateId, shards, entry.failure, entry.userMetadata, entry.version);
}
public Entry(Entry entry, State state, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, String failure) {
this(entry.snapshot, entry.includeGlobalState, entry.partial, state, entry.indices, entry.dataStreams, entry.startTime,
entry.repositoryStateId, shards, failure, entry.userMetadata, entry.version);
}
public Entry(Entry entry, ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
this(entry, entry.state, shards, entry.failure);
}
public Entry withRepoGen(long newRepoGen) {
assert newRepoGen > repositoryStateId : "Updated repository generation [" + newRepoGen
+ "] must be higher than current generation [" + repositoryStateId + "]";
@ -208,7 +204,31 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
userMetadata, version);
}
public Entry withShards(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
/**
* Create a new instance by aborting this instance. Moving all in-progress shards to {@link ShardState#ABORTED} if assigned to a
* data node or to {@link ShardState#FAILED} if not assigned to any data node.
* If the instance had no in-progress shard snapshots assigned to data nodes it's moved to state {@link State#SUCCESS}, otherwise
* it's moved to state {@link State#ABORTED}.
*
* @return aborted snapshot entry
*/
public Entry abort() {
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder = ImmutableOpenMap.builder();
boolean completed = true;
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : shards) {
ShardSnapshotStatus status = shardEntry.value;
if (status.state().completed() == false) {
final String nodeId = status.nodeId();
status = new ShardSnapshotStatus(nodeId, nodeId == null ? ShardState.FAILED : ShardState.ABORTED,
"aborted by snapshot deletion", status.generation());
}
completed &= status.state().completed();
shardsBuilder.put(shardEntry.key, status);
}
return fail(shardsBuilder.build(), completed ? State.SUCCESS : State.ABORTED, "Snapshot was aborted by deletion");
}
public Entry fail(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards, State state, String failure) {
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
failure, userMetadata, version);
}

View File

@ -425,10 +425,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
new Snapshot(repositoryName, snapshotId), "Indices don't have primary shards " + missing);
}
}
newEntry = new SnapshotsInProgress.Entry(
newEntry = SnapshotsInProgress.startedEntry(
new Snapshot(repositoryName, snapshotId), request.includeGlobalState(), request.partial(),
completed(shards.values()) ? State.SUCCESS : State.STARTED, indexIds, dataStreams,
threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards, null, userMeta, version);
indexIds, dataStreams, threadPool.absoluteTimeInMillis(), repositoryData.getGenId(), shards, userMeta, version);
final List<SnapshotsInProgress.Entry> newEntries = new ArrayList<>(runningSnapshots);
newEntries.add(newEntry);
return ClusterState.builder(currentState).putCustom(SnapshotsInProgress.TYPE,
@ -564,9 +563,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (indices.isEmpty()) {
// No indices in this snapshot - we are done
userCreateSnapshotListener.onResponse(snapshot.snapshot());
endSnapshot(new SnapshotsInProgress.Entry(
snapshot, State.STARTED, Collections.emptyList(), repositoryData.getGenId(), ImmutableOpenMap.of(), version,
null), clusterState.metadata(), repositoryData);
endSnapshot(SnapshotsInProgress.startedEntry(
snapshot.snapshot(), snapshot.includeGlobalState(), snapshot.partial(), Collections.emptyList(),
Collections.emptyList(), threadPool.absoluteTimeInMillis(), repositoryData.getGenId(),
ImmutableOpenMap.of(), snapshot.userMetadata(), version), clusterState.metadata(), repositoryData);
return;
}
clusterService.submitStateUpdateTask("update_snapshot [" + snapshot.snapshot() + "]", new ClusterStateUpdateTask() {
@ -735,7 +735,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
} else {
dataStreams.put(dataStreamName, dataStream);
}
};
}
return builder.dataStreams(dataStreams).build();
}
@ -937,13 +937,10 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards = processWaitingShardsAndRemovedNodes(snapshot.shards(),
routingTable, nodes, knownFailures.computeIfAbsent(snapshot.repository(), k -> new HashMap<>()));
if (shards != null) {
final SnapshotsInProgress.Entry updatedSnapshot;
final SnapshotsInProgress.Entry updatedSnapshot = snapshot.withShardStates(shards);
changed = true;
if (completed(shards.values())) {
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, State.SUCCESS, shards);
if (updatedSnapshot.state().completed()) {
finishedSnapshots.add(updatedSnapshot);
} else {
updatedSnapshot = new SnapshotsInProgress.Entry(snapshot, shards);
}
updatedSnapshotEntries.add(updatedSnapshot);
} else {
@ -1575,8 +1572,9 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
abortedDuringInit = true;
} else if (state == State.STARTED) {
// snapshot is started - mark every non completed shard as aborted
shards = abortEntry(snapshotEntry);
failure = "Snapshot was aborted by deletion";
final SnapshotsInProgress.Entry abortedEntry = snapshotEntry.abort();
shards = abortedEntry.shards();
failure = abortedEntry.failure();
} else {
boolean hasUncompletedShards = false;
// Cleanup in case a node gone missing and snapshot wasn't updated for some reason
@ -1606,7 +1604,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
.filter(existing -> abortedDuringInit == false || existing.equals(snapshotEntry) == false)
.map(existing -> {
if (existing.equals(snapshotEntry)) {
return new SnapshotsInProgress.Entry(snapshotEntry, State.ABORTED, shards, failure);
return snapshotEntry.fail(shards, State.ABORTED, failure);
}
return existing;
}).collect(Collectors.toList()))).build();
@ -1766,12 +1764,8 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
.map(existing -> {
// snapshot is started - mark every non completed shard as aborted
if (existing.state() == State.STARTED && snapshotIds.contains(existing.snapshot().getSnapshotId())) {
final ImmutableOpenMap<ShardId, ShardSnapshotStatus> abortedShards = abortEntry(existing);
final boolean isCompleted = completed(abortedShards.values());
final SnapshotsInProgress.Entry abortedEntry = new SnapshotsInProgress.Entry(
existing, isCompleted ? State.SUCCESS : State.ABORTED, abortedShards,
"Snapshot was aborted by deletion");
if (isCompleted) {
final SnapshotsInProgress.Entry abortedEntry = existing.abort();
if (abortedEntry.state().completed()) {
completedSnapshots.add(abortedEntry);
}
return abortedEntry;
@ -1865,21 +1859,6 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
return false;
}
private ImmutableOpenMap<ShardId, ShardSnapshotStatus> abortEntry(SnapshotsInProgress.Entry existing) {
final ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shardsBuilder =
ImmutableOpenMap.builder();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : existing.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (status.state().completed() == false) {
final String nodeId = status.nodeId();
status = new ShardSnapshotStatus(nodeId, nodeId == null ? ShardState.FAILED : ShardState.ABORTED,
"aborted by snapshot deletion", status.generation());
}
shardsBuilder.put(shardEntry.key, status);
}
return shardsBuilder.build();
}
private void addDeleteListener(String deleteUUID, ActionListener<Void> listener) {
snapshotDeletionListeners.computeIfAbsent(deleteUUID, k -> new CopyOnWriteArrayList<>()).add(listener);
}
@ -2208,7 +2187,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
updatedAssignmentsBuilder.put(shardId, updated);
}
}
snapshotEntries.add(entry.withShards(updatedAssignmentsBuilder.build()));
snapshotEntries.add(entry.withStartedShards(updatedAssignmentsBuilder.build()));
changed = true;
}
} else {

View File

@ -41,6 +41,7 @@ import org.hamcrest.core.IsNull;
import org.junit.Before;
import java.util.HashSet;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Set;
@ -84,7 +85,7 @@ public class MetadataDeleteIndexServiceTests extends ESTestCase {
SnapshotsInProgress snaps = SnapshotsInProgress.of(
org.elasticsearch.common.collect.List.of(new SnapshotsInProgress.Entry(snapshot, true, false,
SnapshotsInProgress.State.INIT, singletonList(new IndexId(index, "doesn't matter")),
System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of(),
Collections.emptyList(), System.currentTimeMillis(), (long) randomIntBetween(0, 1000), ImmutableOpenMap.of(), null,
SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()))));
ClusterState state = ClusterState.builder(clusterState(index))
.putCustom(SnapshotsInProgress.TYPE, snaps)

View File

@ -461,10 +461,10 @@ public class MetadataIndexStateServiceTests extends ESTestCase {
final Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(5), randomAlphaOfLength(5)));
final SnapshotsInProgress.Entry entry =
new SnapshotsInProgress.Entry(snapshot, randomBoolean(), false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId(index, index)), randomNonNegativeLong(), randomLong(), shardsBuilder.build(),
SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()));
return ClusterState.builder(newState).putCustom(
SnapshotsInProgress.TYPE, SnapshotsInProgress.of(Collections.singletonList(entry))).build();
Collections.singletonList(new IndexId(index, index)), Collections.emptyList(), randomNonNegativeLong(), randomLong(),
shardsBuilder.build(), null, SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()));
return ClusterState.builder(newState).putCustom(SnapshotsInProgress.TYPE,
SnapshotsInProgress.of(Collections.singletonList(entry))).build();
}
private static ClusterState addIndex(final ClusterState currentState,

View File

@ -56,7 +56,6 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
Snapshot snapshot = new Snapshot(randomAlphaOfLength(10), new SnapshotId(randomAlphaOfLength(10), randomAlphaOfLength(10)));
boolean includeGlobalState = randomBoolean();
boolean partial = randomBoolean();
State state = randomFrom(State.values());
int numberOfIndices = randomIntBetween(0, 10);
List<IndexId> indices = new ArrayList<>();
for (int i = 0; i < numberOfIndices; i++) {
@ -81,8 +80,8 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
}
}
ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards = builder.build();
return new Entry(snapshot, includeGlobalState, partial, state, indices, dataStreams, startTime, repositoryStateId, shards,
null, SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()));
return new Entry(snapshot, includeGlobalState, partial, randomState(shards), indices, dataStreams,
startTime, repositoryStateId, shards, null, SnapshotInfoTests.randomUserMetadata(), VersionUtils.randomVersion(random()));
}
@Override
@ -110,7 +109,8 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
// modify some elements
for (int i = 0; i < entries.size(); i++) {
if (randomBoolean()) {
entries.set(i, new Entry(entries.get(i), randomFrom(State.values()), entries.get(i).shards()));
final Entry entry = entries.get(i);
entries.set(i, entry.fail(entry.shards(), randomState(entry.shards()), entry.failure()));
}
}
}
@ -138,4 +138,9 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
}
return SnapshotsInProgress.of(entries);
}
public static State randomState(ImmutableOpenMap<ShardId, SnapshotsInProgress.ShardSnapshotStatus> shards) {
return SnapshotsInProgress.completed(shards.values())
? randomFrom(State.SUCCESS, State.FAILED) : randomFrom(State.STARTED, State.INIT, State.ABORTED);
}
}

View File

@ -111,7 +111,7 @@ public class DeleteDataStreamTransportActionTests extends ESTestCase {
new Snapshot(repo, new SnapshotId("", "")),
false,
partial,
SnapshotsInProgress.State.STARTED,
SnapshotsInProgress.State.SUCCESS,
Collections.emptyList(),
Collections.singletonList(dataStreamName),
0,

View File

@ -345,8 +345,8 @@ public class SnapshotRetentionTaskTests extends ESTestCase {
SnapshotsInProgress inProgress = SnapshotsInProgress.of(
Collections.singletonList(new SnapshotsInProgress.Entry(
snapshot, true, false, SnapshotsInProgress.State.INIT,
Collections.singletonList(new IndexId("name", "id")), 0, 0,
ImmutableOpenMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().build(), Collections.emptyMap(),
Collections.singletonList(new IndexId("name", "id")), Collections.emptyList(), 0, 0,
ImmutableOpenMap.<ShardId, SnapshotsInProgress.ShardSnapshotStatus>builder().build(), null, Collections.emptyMap(),
VersionUtils.randomVersion(random()))));
ClusterState state = ClusterState.builder(new ClusterName("cluster"))
.putCustom(SnapshotsInProgress.TYPE, inProgress)