Introduce ShardState Enum + Slight Cleanup SnapshotsInProgress (#41940) (#42573)

* Added separate enum for the state of each shard, it was really
confusing that we used the same enum for the state of the snapshot
overall and the state of each individual shard
   * relates https://github.com/elastic/elasticsearch/pull/40943#issuecomment-488664150
* Shortened some obvious spots in equals method and saved a few lines
via `computeIfAbsent` to make up for adding 50 new lines to this class
This commit is contained in:
Armin Braun 2019-05-27 12:08:45 +02:00 committed by GitHub
parent a96606d962
commit bb7e8eb2fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 97 additions and 60 deletions

View File

@ -174,7 +174,6 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction<Sn
break;
case INIT:
case WAITING:
case STARTED:
stage = SnapshotIndexShardStage.STARTED;
break;
case SUCCESS:

View File

@ -42,6 +42,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* Meta data about snapshots that are currently executing
@ -53,12 +54,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SnapshotsInProgress that = (SnapshotsInProgress) o;
if (!entries.equals(that.entries)) return false;
return true;
return entries.equals(((SnapshotsInProgress) o).entries);
}
@Override
@ -208,18 +204,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
return snapshot.toString();
}
// package private for testing
ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
private ImmutableOpenMap<String, List<ShardId>> findWaitingIndices(ImmutableOpenMap<ShardId, ShardSnapshotStatus> shards) {
Map<String, List<ShardId>> waitingIndicesMap = new HashMap<>();
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> entry : shards) {
if (entry.value.state() == State.WAITING) {
final String indexName = entry.key.getIndexName();
List<ShardId> waitingShards = waitingIndicesMap.get(indexName);
if (waitingShards == null) {
waitingShards = new ArrayList<>();
waitingIndicesMap.put(indexName, waitingShards);
}
waitingShards.add(entry.key);
if (entry.value.state() == ShardState.WAITING) {
waitingIndicesMap.computeIfAbsent(entry.key.getIndexName(), k -> new ArrayList<>()).add(entry.key);
}
}
if (waitingIndicesMap.isEmpty()) {
@ -241,28 +230,27 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
*/
public static boolean completed(ObjectContainer<ShardSnapshotStatus> shards) {
for (ObjectCursor<ShardSnapshotStatus> status : shards) {
if (status.value.state().completed() == false) {
if (status.value.state().completed == false) {
return false;
}
}
return true;
}
public static class ShardSnapshotStatus {
private final State state;
private final ShardState state;
private final String nodeId;
private final String reason;
public ShardSnapshotStatus(String nodeId) {
this(nodeId, State.INIT);
this(nodeId, ShardState.INIT);
}
public ShardSnapshotStatus(String nodeId, State state) {
public ShardSnapshotStatus(String nodeId, ShardState state) {
this(nodeId, state, null);
}
public ShardSnapshotStatus(String nodeId, State state, String reason) {
public ShardSnapshotStatus(String nodeId, ShardState state, String reason) {
this.nodeId = nodeId;
this.state = state;
this.reason = reason;
@ -272,11 +260,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public ShardSnapshotStatus(StreamInput in) throws IOException {
nodeId = in.readOptionalString();
state = State.fromValue(in.readByte());
state = ShardState.fromValue(in.readByte());
reason = in.readOptionalString();
}
public State state() {
public ShardState state() {
return state;
}
@ -298,14 +286,9 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ShardSnapshotStatus status = (ShardSnapshotStatus) o;
return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state;
if (nodeId != null ? !nodeId.equals(status.nodeId) : status.nodeId != null) return false;
if (reason != null ? !reason.equals(status.reason) : status.reason != null) return false;
if (state != status.state) return false;
return true;
}
@Override
@ -331,11 +314,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
MISSING((byte) 5, true, true),
WAITING((byte) 6, false, false);
private byte value;
private final byte value;
private boolean completed;
private final boolean completed;
private boolean failed;
private final boolean failed;
State(byte value, boolean completed, boolean failed) {
this.value = value;
@ -379,7 +362,6 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
private final List<Entry> entries;
public SnapshotsInProgress(List<Entry> entries) {
this.entries = entries;
}
@ -437,7 +419,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
builder.put(shardId, new ShardSnapshotStatus(in));
} else {
String nodeId = in.readOptionalString();
State shardState = State.fromValue(in.readByte());
ShardState shardState = ShardState.fromValue(in.readByte());
// Workaround for https://github.com/elastic/elasticsearch/issues/25878
// Some old snapshot might still have null in shard failure reasons
String reason = shardState.failed() ? "" : null;
@ -484,7 +466,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
shardEntry.value.writeTo(out);
} else {
out.writeOptionalString(shardEntry.value.nodeId());
out.writeByte(shardEntry.value.state().value());
out.writeByte(shardEntry.value.state().value);
}
}
out.writeLong(entry.repositoryStateId);
@ -555,4 +537,52 @@ public class SnapshotsInProgress extends AbstractNamedDiffable<Custom> implement
builder.endArray();
builder.endObject();
}
public enum ShardState {
INIT((byte) 0, false, false),
SUCCESS((byte) 2, true, false),
FAILED((byte) 3, true, true),
ABORTED((byte) 4, false, true),
MISSING((byte) 5, true, true),
WAITING((byte) 6, false, false);
private final byte value;
private final boolean completed;
private final boolean failed;
ShardState(byte value, boolean completed, boolean failed) {
this.value = value;
this.completed = completed;
this.failed = failed;
}
public boolean completed() {
return completed;
}
public boolean failed() {
return failed;
}
public static ShardState fromValue(byte value) {
switch (value) {
case 0:
return INIT;
case 2:
return SUCCESS;
case 3:
return FAILED;
case 4:
return ABORTED;
case 5:
return MISSING;
case 6:
return WAITING;
default:
throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]");
}
}
}
}

View File

@ -39,6 +39,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor;
import org.elasticsearch.cluster.ClusterStateTaskListener;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -246,7 +247,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
// Add all new shards to start processing on
final ShardId shardId = shard.key;
final ShardSnapshotStatus shardSnapshotStatus = shard.value;
if (localNodeId.equals(shardSnapshotStatus.nodeId()) && shardSnapshotStatus.state() == State.INIT
if (localNodeId.equals(shardSnapshotStatus.nodeId())
&& shardSnapshotStatus.state() == ShardState.INIT
&& snapshotShards.containsKey(shardId) == false) {
logger.trace("[{}] - Adding shard to the queue", shardId);
if (startedShards == null) {
@ -284,7 +286,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
} else {
// due to CS batching we might have missed the INIT state and straight went into ABORTED
// notify master that abort has completed by moving to FAILED
if (shard.value.state() == State.ABORTED) {
if (shard.value.state() == ShardState.ABORTED) {
notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason());
}
}
@ -477,12 +479,14 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements
/** Notify the master node that the given shard has been successfully snapshotted **/
private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) {
sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.SUCCESS));
sendSnapshotShardUpdate(snapshot, shardId,
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS));
}
/** Notify the master node that the given shard failed to be snapshotted **/
private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) {
sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.FAILED, failure));
sendSnapshotShardUpdate(snapshot, shardId,
new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure));
}
/** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */

View File

@ -37,6 +37,7 @@ import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotDeletionsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
@ -774,7 +775,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
logger.warn("failing snapshot of shard [{}] on closed node [{}]",
shardEntry.key, shardStatus.nodeId());
shards.put(shardEntry.key,
new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown"));
new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown"));
}
}
}
@ -870,7 +871,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotShards) {
ShardSnapshotStatus shardStatus = shardEntry.value;
ShardId shardId = shardEntry.key;
if (shardStatus.state() == State.WAITING) {
if (shardStatus.state() == ShardState.WAITING) {
IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex());
if (indexShardRoutingTable != null) {
IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id());
@ -891,7 +892,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
// Shard that we were waiting for went into unassigned state or disappeared - giving up
snapshotChanged = true;
logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId());
shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "shard is unassigned"));
shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned"));
} else {
shards.put(shardId, shardStatus);
}
@ -941,7 +942,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
Set<String> missing = new HashSet<>();
Set<String> closed = new HashSet<>();
for (ObjectObjectCursor<ShardId, SnapshotsInProgress.ShardSnapshotStatus> entry : shards) {
if (entry.value.state() == State.MISSING) {
if (entry.value.state() == ShardState.MISSING) {
if (metaData.hasIndex(entry.key.getIndex().getName()) &&
metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) {
closed.add(entry.key.getIndex().getName());
@ -1192,7 +1193,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
for (ObjectObjectCursor<ShardId, ShardSnapshotStatus> shardEntry : snapshotEntry.shards()) {
ShardSnapshotStatus status = shardEntry.value;
if (status.state().completed() == false) {
status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion");
status = new ShardSnapshotStatus(status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion");
}
shardsBuilder.put(shardEntry.key, status);
}
@ -1382,7 +1383,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
if (indexMetaData == null) {
// The index was deleted before we managed to start the snapshot - mark it as missing.
builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0),
new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index"));
new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing index"));
} else {
IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName);
for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) {
@ -1391,18 +1392,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus
ShardRouting primary = indexRoutingTable.shard(i).primaryShard();
if (primary == null || !primary.assignedToNode()) {
builder.put(shardId,
new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated"));
new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "primary shard is not allocated"));
} else if (primary.relocating() || primary.initializing()) {
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.WAITING));
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING));
} else if (!primary.started()) {
builder.put(shardId,
new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING,
new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING,
"primary shard hasn't been started yet"));
} else {
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId()));
}
} else {
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING,
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING,
"missing routing table"));
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.cluster;
import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.index.shard.ShardId;
@ -55,11 +56,11 @@ public class SnapshotsInProgressTests extends ESTestCase {
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
// test more than one waiting shard in an index
shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
// test exactly one waiting shard in an index
shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING));
shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING));
shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
// test no waiting shards in an index
shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), ""));
@ -72,7 +73,7 @@ public class SnapshotsInProgressTests extends ESTestCase {
assertFalse(waitingIndices.containsKey(idx3Name));
}
private State randomNonWaitingState() {
return randomFrom(Arrays.stream(State.values()).filter(s -> s != State.WAITING).collect(Collectors.toSet()));
private ShardState randomNonWaitingState() {
return randomFrom(Arrays.stream(ShardState.values()).filter(s -> s != ShardState.WAITING).collect(Collectors.toSet()));
}
}

View File

@ -54,6 +54,7 @@ import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.cluster.block.ClusterBlocks;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -2702,9 +2703,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
public ClusterState execute(ClusterState currentState) {
// Simulate orphan snapshot
ImmutableOpenMap.Builder<ShardId, ShardSnapshotStatus> shards = ImmutableOpenMap.builder();
shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted"));
shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted"));
shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted"));
shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted"));
List<Entry> entries = new ArrayList<>();
entries.add(new Entry(new Snapshot(repositoryName,
createSnapshotResponse.getSnapshotInfo().snapshotId()),

View File

@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.SnapshotsInProgress;
import org.elasticsearch.cluster.SnapshotsInProgress.Entry;
import org.elasticsearch.cluster.SnapshotsInProgress.ShardState;
import org.elasticsearch.cluster.SnapshotsInProgress.State;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@ -65,7 +66,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS
for (int j = 0; j < shardsCount; j++) {
ShardId shardId = new ShardId(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10)), randomIntBetween(0, 10));
String nodeId = randomAlphaOfLength(10);
State shardState = randomFrom(State.values());
ShardState shardState = randomFrom(ShardState.values());
builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState,
shardState.failed() ? randomAlphaOfLength(10) : null));
}