From d330ca6559b8042764efa92c1667e892bb2489d9 Mon Sep 17 00:00:00 2001 From: Abbas Hussain Date: Fri, 21 May 2021 02:41:49 +0530 Subject: [PATCH] Handle inefficiencies while fetching the delayed unassigned shards during cluster health (#588) (#730) Signed-off-by: Meet Shah Co-authored-by: Meet Shah <48720201+meetshah777@users.noreply.github.com> --- .../cluster/routing/IndexRoutingTable.java | 9 ++++++ .../routing/IndexShardRoutingTable.java | 11 ++++++++ .../cluster/routing/RoutingTable.java | 8 ++++++ .../cluster/routing/UnassignedInfo.java | 10 ++----- .../cluster/routing/RoutingTableTests.java | 28 +++++++++++++++++++ 5 files changed, 59 insertions(+), 7 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java index 10767af5048..4ec2cec29ae 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexRoutingTable.java @@ -59,6 +59,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.function.Predicate; /** * The {@link IndexRoutingTable} represents routing information for a single @@ -278,6 +279,14 @@ public class IndexRoutingTable extends AbstractDiffable imple return shards; } + public int shardsMatchingPredicateCount(Predicate predicate) { + int count = 0; + for (IndexShardRoutingTable shardRoutingTable : this) { + count += shardRoutingTable.shardsMatchingPredicateCount(predicate); + } + return count; + } + /** * Returns an unordered iterator over all active shards (including replicas). */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 80edea9de18..27aeaf100db 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -56,6 +56,7 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.function.Predicate; import static java.util.Collections.emptyMap; @@ -672,6 +673,16 @@ public class IndexShardRoutingTable implements Iterable { return shards; } + public int shardsMatchingPredicateCount(Predicate predicate) { + int count = 0; + for (ShardRouting shardEntry : this) { + if (predicate.test(shardEntry)) { + count++; + } + } + return count; + } + public static class Builder { private ShardId shardId; diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java index 4fd715a0e4a..ec5903cb424 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingTable.java @@ -196,6 +196,14 @@ public class RoutingTable implements Iterable, Diffable predicate) { + int count = 0; + for (IndexRoutingTable indexRoutingTable : this) { + count += indexRoutingTable.shardsMatchingPredicateCount(predicate); + } + return count; + } + /** * All the shards (replicas) for all indices in this routing table. * diff --git a/server/src/main/java/org/opensearch/cluster/routing/UnassignedInfo.java b/server/src/main/java/org/opensearch/cluster/routing/UnassignedInfo.java index d92e82cd0ae..587a010f160 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/UnassignedInfo.java +++ b/server/src/main/java/org/opensearch/cluster/routing/UnassignedInfo.java @@ -58,6 +58,7 @@ import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.Set; +import java.util.function.Predicate; /** * Holds additional information as to why the shard is in unassigned state. @@ -413,13 +414,8 @@ public final class UnassignedInfo implements ToXContentFragment, Writeable { * Returns the number of shards that are unassigned and currently being delayed. */ public static int getNumberOfDelayedUnassigned(ClusterState state) { - int count = 0; - for (ShardRouting shard : state.routingTable().shardsWithState(ShardRoutingState.UNASSIGNED)) { - if (shard.unassignedInfo().isDelayed()) { - count++; - } - } - return count; + Predicate predicate = s -> s.state() == ShardRoutingState.UNASSIGNED && s.unassignedInfo().isDelayed(); + return state.routingTable().shardsMatchingPredicateCount(predicate); } /** diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java index b4737a4a5a3..7225892e26b 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java @@ -50,6 +50,7 @@ import org.junit.Before; import java.util.Arrays; import java.util.Set; import java.util.stream.Collectors; +import java.util.function.Predicate; import static org.opensearch.cluster.routing.ShardRoutingState.UNASSIGNED; import static org.hamcrest.Matchers.containsString; @@ -183,6 +184,33 @@ public class RoutingTableTests extends OpenSearchAllocationTestCase { assertThat(clusterState.routingTable().shardsWithState(ShardRoutingState.STARTED).size(), is(this.totalNumberOfShards)); } + public void testShardsMatchingPredicateCount(){ + MockAllocationService allocation = createAllocationService(Settings.EMPTY, new DelayedShardsMockGatewayAllocator()); + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(1)) + .build(); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .routingTable(RoutingTable.builder().addAsNew(metadata.index("test1")).addAsNew(metadata.index("test2")).build()).build(); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(); + clusterState = allocation.reroute(clusterState, "reroute"); + + Predicate predicate = s -> s.state() == ShardRoutingState.UNASSIGNED && s.unassignedInfo().isDelayed(); + assertThat(clusterState.routingTable().shardsMatchingPredicateCount(predicate), is(0)); + + // starting primaries + clusterState = startInitializingShardsAndReroute(allocation, clusterState); + // starting replicas + clusterState = startInitializingShardsAndReroute(allocation, clusterState); + // remove node2 and reroute + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build(); + // make sure both replicas are marked as delayed (i.e. not reallocated) + clusterState = allocation.disassociateDeadNodes(clusterState, true, "reroute"); + assertThat(clusterState.routingTable().shardsMatchingPredicateCount(predicate), is(2)); + } + public void testActivePrimaryShardsGrouped() { assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], true).size(), is(0)); assertThat(this.emptyRoutingTable.activePrimaryShardsGrouped(new String[0], false).size(), is(0));