From bb7e8eb2fd72cb9ed88d6e5932c46e3c526afa75 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 27 May 2019 12:08:45 +0200 Subject: [PATCH] Introduce ShardState Enum + Slight Cleanup SnapshotsInProgress (#41940) (#42573) * Added separate enum for the state of each shard, it was really confusing that we used the same enum for the state of the snapshot overall and the state of each individual shard * relates https://github.com/elastic/elasticsearch/pull/40943#issuecomment-488664150 * Shortened some obvious spots in equals method and saved a few lines via `computeIfAbsent` to make up for adding 50 new lines to this class --- .../TransportSnapshotsStatusAction.java | 1 - .../cluster/SnapshotsInProgress.java | 102 +++++++++++------- .../snapshots/SnapshotShardsService.java | 12 ++- .../snapshots/SnapshotsService.java | 21 ++-- .../cluster/SnapshotsInProgressTests.java | 11 +- .../SharedClusterSnapshotRestoreIT.java | 7 +- ...SnapshotsInProgressSerializationTests.java | 3 +- 7 files changed, 97 insertions(+), 60 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java index c2f0d3dd0c0..8430d1868c8 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/snapshots/status/TransportSnapshotsStatusAction.java @@ -174,7 +174,6 @@ public class TransportSnapshotsStatusAction extends TransportMasterNodeAction implement public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - - SnapshotsInProgress that = (SnapshotsInProgress) o; - - if (!entries.equals(that.entries)) return false; - - return true; + return entries.equals(((SnapshotsInProgress) o).entries); } @Override @@ -208,18 +204,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement return snapshot.toString(); } - // package private for testing - ImmutableOpenMap> findWaitingIndices(ImmutableOpenMap shards) { + private ImmutableOpenMap> findWaitingIndices(ImmutableOpenMap shards) { Map> waitingIndicesMap = new HashMap<>(); for (ObjectObjectCursor entry : shards) { - if (entry.value.state() == State.WAITING) { - final String indexName = entry.key.getIndexName(); - List waitingShards = waitingIndicesMap.get(indexName); - if (waitingShards == null) { - waitingShards = new ArrayList<>(); - waitingIndicesMap.put(indexName, waitingShards); - } - waitingShards.add(entry.key); + if (entry.value.state() == ShardState.WAITING) { + waitingIndicesMap.computeIfAbsent(entry.key.getIndexName(), k -> new ArrayList<>()).add(entry.key); } } if (waitingIndicesMap.isEmpty()) { @@ -241,28 +230,27 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement */ public static boolean completed(ObjectContainer shards) { for (ObjectCursor status : shards) { - if (status.value.state().completed() == false) { + if (status.value.state().completed == false) { return false; } } return true; } - public static class ShardSnapshotStatus { - private final State state; + private final ShardState state; private final String nodeId; private final String reason; public ShardSnapshotStatus(String nodeId) { - this(nodeId, State.INIT); + this(nodeId, ShardState.INIT); } - public ShardSnapshotStatus(String nodeId, State state) { + public ShardSnapshotStatus(String nodeId, ShardState state) { this(nodeId, state, null); } - public ShardSnapshotStatus(String nodeId, State state, String reason) { + public ShardSnapshotStatus(String nodeId, ShardState state, String reason) { this.nodeId = nodeId; this.state = state; this.reason = reason; @@ -272,11 +260,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public ShardSnapshotStatus(StreamInput in) throws IOException { nodeId = in.readOptionalString(); - state = State.fromValue(in.readByte()); + state = ShardState.fromValue(in.readByte()); reason = in.readOptionalString(); } - public State state() { + public ShardState state() { return state; } @@ -298,14 +286,9 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; - ShardSnapshotStatus status = (ShardSnapshotStatus) o; + return Objects.equals(nodeId, status.nodeId) && Objects.equals(reason, status.reason) && state == status.state; - if (nodeId != null ? !nodeId.equals(status.nodeId) : status.nodeId != null) return false; - if (reason != null ? !reason.equals(status.reason) : status.reason != null) return false; - if (state != status.state) return false; - - return true; } @Override @@ -331,11 +314,11 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement MISSING((byte) 5, true, true), WAITING((byte) 6, false, false); - private byte value; + private final byte value; - private boolean completed; + private final boolean completed; - private boolean failed; + private final boolean failed; State(byte value, boolean completed, boolean failed) { this.value = value; @@ -379,7 +362,6 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement private final List entries; - public SnapshotsInProgress(List entries) { this.entries = entries; } @@ -437,7 +419,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement builder.put(shardId, new ShardSnapshotStatus(in)); } else { String nodeId = in.readOptionalString(); - State shardState = State.fromValue(in.readByte()); + ShardState shardState = ShardState.fromValue(in.readByte()); // Workaround for https://github.com/elastic/elasticsearch/issues/25878 // Some old snapshot might still have null in shard failure reasons String reason = shardState.failed() ? "" : null; @@ -484,7 +466,7 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement shardEntry.value.writeTo(out); } else { out.writeOptionalString(shardEntry.value.nodeId()); - out.writeByte(shardEntry.value.state().value()); + out.writeByte(shardEntry.value.state().value); } } out.writeLong(entry.repositoryStateId); @@ -555,4 +537,52 @@ public class SnapshotsInProgress extends AbstractNamedDiffable implement builder.endArray(); builder.endObject(); } + + public enum ShardState { + INIT((byte) 0, false, false), + SUCCESS((byte) 2, true, false), + FAILED((byte) 3, true, true), + ABORTED((byte) 4, false, true), + MISSING((byte) 5, true, true), + WAITING((byte) 6, false, false); + + private final byte value; + + private final boolean completed; + + private final boolean failed; + + ShardState(byte value, boolean completed, boolean failed) { + this.value = value; + this.completed = completed; + this.failed = failed; + } + + public boolean completed() { + return completed; + } + + public boolean failed() { + return failed; + } + + public static ShardState fromValue(byte value) { + switch (value) { + case 0: + return INIT; + case 2: + return SUCCESS; + case 3: + return FAILED; + case 4: + return ABORTED; + case 5: + return MISSING; + case 6: + return WAITING; + default: + throw new IllegalArgumentException("No shard snapshot state for value [" + value + "]"); + } + } + } } diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java index 16eb3bad1b5..347bd714af7 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotShardsService.java @@ -39,6 +39,7 @@ import org.elasticsearch.cluster.ClusterStateTaskExecutor; import org.elasticsearch.cluster.ClusterStateTaskListener; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -246,7 +247,8 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements // Add all new shards to start processing on final ShardId shardId = shard.key; final ShardSnapshotStatus shardSnapshotStatus = shard.value; - if (localNodeId.equals(shardSnapshotStatus.nodeId()) && shardSnapshotStatus.state() == State.INIT + if (localNodeId.equals(shardSnapshotStatus.nodeId()) + && shardSnapshotStatus.state() == ShardState.INIT && snapshotShards.containsKey(shardId) == false) { logger.trace("[{}] - Adding shard to the queue", shardId); if (startedShards == null) { @@ -284,7 +286,7 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements } else { // due to CS batching we might have missed the INIT state and straight went into ABORTED // notify master that abort has completed by moving to FAILED - if (shard.value.state() == State.ABORTED) { + if (shard.value.state() == ShardState.ABORTED) { notifyFailedSnapshotShard(snapshot, shard.key, shard.value.reason()); } } @@ -477,12 +479,14 @@ public class SnapshotShardsService extends AbstractLifecycleComponent implements /** Notify the master node that the given shard has been successfully snapshotted **/ private void notifySuccessfulSnapshotShard(final Snapshot snapshot, final ShardId shardId) { - sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.SUCCESS)); + sendSnapshotShardUpdate(snapshot, shardId, + new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.SUCCESS)); } /** Notify the master node that the given shard failed to be snapshotted **/ private void notifyFailedSnapshotShard(final Snapshot snapshot, final ShardId shardId, final String failure) { - sendSnapshotShardUpdate(snapshot, shardId, new ShardSnapshotStatus(clusterService.localNode().getId(), State.FAILED, failure)); + sendSnapshotShardUpdate(snapshot, shardId, + new ShardSnapshotStatus(clusterService.localNode().getId(), ShardState.FAILED, failure)); } /** Updates the shard snapshot status by sending a {@link UpdateIndexShardSnapshotStatusRequest} to the master node */ diff --git a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java index 11bf6f07831..b1d365f7ff1 100644 --- a/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java +++ b/server/src/main/java/org/elasticsearch/snapshots/SnapshotsService.java @@ -37,6 +37,7 @@ import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotDeletionsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -774,7 +775,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus logger.warn("failing snapshot of shard [{}] on closed node [{}]", shardEntry.key, shardStatus.nodeId()); shards.put(shardEntry.key, - new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "node shutdown")); + new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "node shutdown")); } } } @@ -870,7 +871,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus for (ObjectObjectCursor shardEntry : snapshotShards) { ShardSnapshotStatus shardStatus = shardEntry.value; ShardId shardId = shardEntry.key; - if (shardStatus.state() == State.WAITING) { + if (shardStatus.state() == ShardState.WAITING) { IndexRoutingTable indexShardRoutingTable = routingTable.index(shardId.getIndex()); if (indexShardRoutingTable != null) { IndexShardRoutingTable shardRouting = indexShardRoutingTable.shard(shardId.id()); @@ -891,7 +892,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus // Shard that we were waiting for went into unassigned state or disappeared - giving up snapshotChanged = true; logger.warn("failing snapshot of shard [{}] on unassigned shard [{}]", shardId, shardStatus.nodeId()); - shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), State.FAILED, "shard is unassigned")); + shards.put(shardId, new ShardSnapshotStatus(shardStatus.nodeId(), ShardState.FAILED, "shard is unassigned")); } else { shards.put(shardId, shardStatus); } @@ -941,7 +942,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus Set missing = new HashSet<>(); Set closed = new HashSet<>(); for (ObjectObjectCursor entry : shards) { - if (entry.value.state() == State.MISSING) { + if (entry.value.state() == ShardState.MISSING) { if (metaData.hasIndex(entry.key.getIndex().getName()) && metaData.getIndexSafe(entry.key.getIndex()).getState() == IndexMetaData.State.CLOSE) { closed.add(entry.key.getIndex().getName()); @@ -1192,7 +1193,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus for (ObjectObjectCursor shardEntry : snapshotEntry.shards()) { ShardSnapshotStatus status = shardEntry.value; if (status.state().completed() == false) { - status = new ShardSnapshotStatus(status.nodeId(), State.ABORTED, "aborted by snapshot deletion"); + status = new ShardSnapshotStatus(status.nodeId(), ShardState.ABORTED, "aborted by snapshot deletion"); } shardsBuilder.put(shardEntry.key, status); } @@ -1382,7 +1383,7 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus if (indexMetaData == null) { // The index was deleted before we managed to start the snapshot - mark it as missing. builder.put(new ShardId(indexName, IndexMetaData.INDEX_UUID_NA_VALUE, 0), - new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "missing index")); + new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing index")); } else { IndexRoutingTable indexRoutingTable = clusterState.getRoutingTable().index(indexName); for (int i = 0; i < indexMetaData.getNumberOfShards(); i++) { @@ -1391,18 +1392,18 @@ public class SnapshotsService extends AbstractLifecycleComponent implements Clus ShardRouting primary = indexRoutingTable.shard(i).primaryShard(); if (primary == null || !primary.assignedToNode()) { builder.put(shardId, - new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, "primary shard is not allocated")); + new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "primary shard is not allocated")); } else if (primary.relocating() || primary.initializing()) { - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.WAITING)); + builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.WAITING)); } else if (!primary.started()) { builder.put(shardId, - new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), State.MISSING, + new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId(), ShardState.MISSING, "primary shard hasn't been started yet")); } else { builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(primary.currentNodeId())); } } else { - builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, State.MISSING, + builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(null, ShardState.MISSING, "missing routing table")); } } diff --git a/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java b/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java index fcf70909b31..eac06c786a2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/SnapshotsInProgressTests.java @@ -21,6 +21,7 @@ package org.elasticsearch.cluster; import org.elasticsearch.cluster.SnapshotsInProgress.Entry; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.shard.ShardId; @@ -55,11 +56,11 @@ public class SnapshotsInProgressTests extends ESTestCase { ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); // test more than one waiting shard in an index - shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING)); - shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING)); + shards.put(new ShardId(idx1Name, idx1UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING)); + shards.put(new ShardId(idx1Name, idx1UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING)); shards.put(new ShardId(idx1Name, idx1UUID, 2), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); // test exactly one waiting shard in an index - shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), State.WAITING)); + shards.put(new ShardId(idx2Name, idx2UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), ShardState.WAITING)); shards.put(new ShardId(idx2Name, idx2UUID, 1), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); // test no waiting shards in an index shards.put(new ShardId(idx3Name, idx3UUID, 0), new ShardSnapshotStatus(randomAlphaOfLength(2), randomNonWaitingState(), "")); @@ -72,7 +73,7 @@ public class SnapshotsInProgressTests extends ESTestCase { assertFalse(waitingIndices.containsKey(idx3Name)); } - private State randomNonWaitingState() { - return randomFrom(Arrays.stream(State.values()).filter(s -> s != State.WAITING).collect(Collectors.toSet())); + private ShardState randomNonWaitingState() { + return randomFrom(Arrays.stream(ShardState.values()).filter(s -> s != ShardState.WAITING).collect(Collectors.toSet())); } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java index 001a83710dc..8e2fff4e275 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SharedClusterSnapshotRestoreIT.java @@ -54,6 +54,7 @@ import org.elasticsearch.cluster.RestoreInProgress; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.Entry; import org.elasticsearch.cluster.SnapshotsInProgress.ShardSnapshotStatus; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.cluster.block.ClusterBlocks; import org.elasticsearch.cluster.metadata.IndexMetaData; @@ -2702,9 +2703,9 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas public ClusterState execute(ClusterState currentState) { // Simulate orphan snapshot ImmutableOpenMap.Builder shards = ImmutableOpenMap.builder(); - shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted")); - shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted")); - shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", State.ABORTED, "aborted")); + shards.put(new ShardId(idxName, "_na_", 0), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted")); + shards.put(new ShardId(idxName, "_na_", 1), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted")); + shards.put(new ShardId(idxName, "_na_", 2), new ShardSnapshotStatus("unknown-node", ShardState.ABORTED, "aborted")); List entries = new ArrayList<>(); entries.add(new Entry(new Snapshot(repositoryName, createSnapshotResponse.getSnapshotInfo().snapshotId()), diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java index 3f23c8f0a2d..6c8ddfb56c1 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotsInProgressSerializationTests.java @@ -24,6 +24,7 @@ import org.elasticsearch.cluster.ClusterState.Custom; import org.elasticsearch.cluster.Diff; import org.elasticsearch.cluster.SnapshotsInProgress; import org.elasticsearch.cluster.SnapshotsInProgress.Entry; +import org.elasticsearch.cluster.SnapshotsInProgress.ShardState; import org.elasticsearch.cluster.SnapshotsInProgress.State; import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; @@ -65,7 +66,7 @@ public class SnapshotsInProgressSerializationTests extends AbstractDiffableWireS for (int j = 0; j < shardsCount; j++) { ShardId shardId = new ShardId(new Index(randomAlphaOfLength(10), randomAlphaOfLength(10)), randomIntBetween(0, 10)); String nodeId = randomAlphaOfLength(10); - State shardState = randomFrom(State.values()); + ShardState shardState = randomFrom(ShardState.values()); builder.put(shardId, new SnapshotsInProgress.ShardSnapshotStatus(nodeId, shardState, shardState.failed() ? randomAlphaOfLength(10) : null)); }