From fe40f8f360b957fa07d8579eec905703b63ec03b Mon Sep 17 00:00:00 2001 From: itiyamas <45985097+itiyamas@users.noreply.github.com> Date: Thu, 17 Jun 2021 13:45:37 +0530 Subject: [PATCH] Decouples primaries_recoveries limit from concurrent recoveries limit. (#546) (#862) * Decouples initial primaries limit from node concurrent limits. Signed-off-by: Itiyama * Checkstyle fixes. Signed-off-by: Itiyama * Checkstyle test fixes. Signed-off-by: Itiyama * Review comments Signed-off-by: Itiyama * Fixes review comments. Signed-off-by: Itiyama --- .../cluster/routing/RoutingNodes.java | 75 +++++++++++++------ .../decider/ThrottlingAllocationDecider.java | 14 +--- .../allocation/AllocationPriorityTests.java | 16 ++-- .../allocation/AllocationServiceTests.java | 13 +++- .../allocation/ThrottlingAllocationTests.java | 3 +- 5 files changed, 75 insertions(+), 46 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index 5d1a11ea5e6..e15bc838b4a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -63,6 +63,7 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.Queue; import java.util.Set; +import java.util.function.Function; import java.util.function.Predicate; /** @@ -96,6 +97,7 @@ public class RoutingNodes implements Iterable { private final Map> nodesPerAttributeNames = new HashMap<>(); private final Map recoveriesPerNode = new HashMap<>(); + private final Map primaryRecoveriesPerNode = new HashMap<>(); public RoutingNodes(ClusterState clusterState) { this(clusterState, true); @@ -175,12 +177,20 @@ public class RoutingNodes implements Iterable { } private void updateRecoveryCounts(final ShardRouting routing, final boolean increment, @Nullable final ShardRouting primary) { + final int howMany = increment ? 1 : -1; assert routing.initializing() : "routing must be initializing: " + routing; // TODO: check primary == null || primary.active() after all tests properly add ReplicaAfterPrimaryActiveAllocationDecider assert primary == null || primary.assignedToNode() : "shard is initializing but its primary is not assigned to a node"; + // Primary shard routing, excluding the relocating primaries. + if(routing.primary() && (primary == null || primary == routing)) { + assert routing.relocatingNodeId() == null: "Routing must be a non relocating primary"; + Recoveries.getOrAdd(primaryRecoveriesPerNode, routing.currentNodeId()).addIncoming(howMany); + return; + } + Recoveries.getOrAdd(recoveriesPerNode, routing.currentNodeId()).addIncoming(howMany); if (routing.recoverySource().getType() == RecoverySource.Type.PEER) { @@ -209,6 +219,10 @@ public class RoutingNodes implements Iterable { return recoveriesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming(); } + public int getInitialPrimariesIncomingRecoveries(String nodeId) { + return primaryRecoveriesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming(); + } + public int getOutgoingRecoveries(String nodeId) { return recoveriesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getOutgoing(); } @@ -1092,30 +1106,10 @@ public class RoutingNodes implements Iterable { } } - for (Map.Entry recoveries : routingNodes.recoveriesPerNode.entrySet()) { - String node = recoveries.getKey(); - final Recoveries value = recoveries.getValue(); - int incoming = 0; - int outgoing = 0; - RoutingNode routingNode = routingNodes.nodesToShards.get(node); - if (routingNode != null) { // node might have dropped out of the cluster - for (ShardRouting routing : routingNode) { - if (routing.initializing()) { - incoming++; - } - if (routing.primary() && routing.isRelocationTarget() == false) { - for (ShardRouting assigned : routingNodes.assignedShards.get(routing.shardId())) { - if (assigned.initializing() && assigned.recoverySource().getType() == RecoverySource.Type.PEER) { - outgoing++; - } - } - } - } - } - assert incoming == value.incoming : incoming + " != " + value.incoming + " node: " + routingNode; - assert outgoing == value.outgoing : outgoing + " != " + value.outgoing + " node: " + routingNode; - } - + assertRecoveriesPerNode(routingNodes, routingNodes.recoveriesPerNode, true, + x -> !isNonRelocatingPrimary(x)); + assertRecoveriesPerNode(routingNodes, routingNodes.primaryRecoveriesPerNode, false, + x -> isNonRelocatingPrimary(x)); assert unassignedPrimaryCount == routingNodes.unassignedShards.getNumPrimaries() : "Unassigned primaries is [" + unassignedPrimaryCount + "] but RoutingNodes returned unassigned primaries [" + @@ -1135,6 +1129,39 @@ public class RoutingNodes implements Iterable { return true; } + private static void assertRecoveriesPerNode(RoutingNodes routingNodes, Map recoveriesPerNode, + boolean verifyOutgoingRecoveries, + Function incomingCountFilter) { + for (Map.Entry recoveries : recoveriesPerNode.entrySet()) { + String node = recoveries.getKey(); + final Recoveries value = recoveries.getValue(); + int incoming = 0; + int outgoing = 0; + RoutingNode routingNode = routingNodes.nodesToShards.get(node); + if (routingNode != null) { // node might have dropped out of the cluster + for (ShardRouting routing : routingNode) { + if (routing.initializing() && incomingCountFilter.apply(routing)) + incoming++; + + if (verifyOutgoingRecoveries && routing.primary() && routing.isRelocationTarget() == false) { + for (ShardRouting assigned : routingNodes.assignedShards.get(routing.shardId())) { + if (assigned.initializing() && assigned.recoverySource().getType() == RecoverySource.Type.PEER) { + outgoing++; + } + } + } + } + } + + assert incoming == value.incoming : incoming + " != " + value.incoming + " node: " + routingNode; + assert outgoing == value.outgoing : outgoing + " != " + value.outgoing + " node: " + routingNode; + } + } + + private static boolean isNonRelocatingPrimary(ShardRouting routing) { + return routing.primary() && routing.relocatingNodeId() == null; + } + private void ensureMutable() { if (readOnly) { throw new IllegalStateException("can't modify RoutingNodes - readonly"); diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index 373506c13cf..d8df077eadd 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -39,7 +39,6 @@ import org.opensearch.cluster.routing.RoutingNode; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.cluster.routing.allocation.RoutingAllocation; -import org.opensearch.cluster.routing.ShardRoutingState; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; @@ -58,7 +57,8 @@ import static org.opensearch.cluster.routing.allocation.decider.Decision.YES; * node. The default is {@code 4} *
  • {@code cluster.routing.allocation.node_concurrent_recoveries} - * restricts the number of total concurrent shards initializing on a single node. The - * default is {@code 2}
  • + * default is {@code 2}. Please note that this limit excludes the initial primaries + * recovery operations per node. * *

    * If one of the above thresholds is exceeded per node this allocation decider @@ -135,14 +135,8 @@ public class ThrottlingAllocationDecider extends AllocationDecider { // primary is unassigned, means we are going to do recovery from store, snapshot or local shards // count *just the primaries* currently doing recovery on the node and check against primariesInitialRecoveries - int primariesInRecovery = 0; - for (ShardRouting shard : node.shardsWithState(ShardRoutingState.INITIALIZING)) { - // when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node* - // we only count initial recoveries here, so we need to make sure that relocating node is null - if (shard.primary() && shard.relocatingNodeId() == null) { - primariesInRecovery++; - } - } + int primariesInRecovery = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node.nodeId()); + if (primariesInRecovery >= primariesInitialRecoveries) { // TODO: Should index creation not be throttled for primary shards? return allocation.decision(THROTTLE, NAME, diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationPriorityTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationPriorityTests.java index b393c0c7877..15ea5bb8573 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationPriorityTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationPriorityTests.java @@ -42,6 +42,9 @@ import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; +import java.util.List; +import java.util.stream.Collectors; + import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; public class AllocationPriorityTests extends OpenSearchAllocationTestCase { @@ -95,18 +98,15 @@ public class AllocationPriorityTests extends OpenSearchAllocationTestCase { assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).getIndexName()); clusterState = startInitializingShardsAndReroute(allocation, clusterState); - assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); - assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).getIndexName()); - assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).getIndexName()); + assertEquals(4, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + List indices = clusterState.getRoutingNodes().shardsWithState(INITIALIZING).stream(). + map(x->x.getIndexName()).collect(Collectors.toList()); + assertTrue(indices.contains(lowPriorityName)); + assertTrue(indices.contains(highPriorityName)); clusterState = startInitializingShardsAndReroute(allocation, clusterState); assertEquals(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).toString(),2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); - assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).getIndexName()); - assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).getIndexName()); - - clusterState = startInitializingShardsAndReroute(allocation, clusterState); - assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).getIndexName()); assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).getIndexName()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java index d6fa079c4e6..187e6fb26cb 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java @@ -213,6 +213,7 @@ public class AllocationServiceTests extends OpenSearchTestCase { final RoutingTable routingTable1 = reroutedState1.routingTable(); // the test harness only permits one recovery per node, so we must have allocated all the high-priority primaries and one of the // medium-priority ones + assertThat(routingTable1.shardsWithState(ShardRoutingState.INITIALIZING), empty()); assertThat(routingTable1.shardsWithState(ShardRoutingState.RELOCATING), empty()); assertTrue(routingTable1.shardsWithState(ShardRoutingState.STARTED).stream().allMatch(ShardRouting::primary)); @@ -223,22 +224,28 @@ public class AllocationServiceTests extends OpenSearchTestCase { final ClusterState reroutedState2 = rerouteAndStartShards(allocationService, reroutedState1); final RoutingTable routingTable2 = reroutedState2.routingTable(); - // this reroute starts the one remaining medium-priority primary and both of the low-priority ones, but no replicas + + // this reroute starts the one remaining medium-priority primary and both of the low-priority ones, + // and also 1 medium priority replica assertThat(routingTable2.shardsWithState(ShardRoutingState.INITIALIZING), empty()); assertThat(routingTable2.shardsWithState(ShardRoutingState.RELOCATING), empty()); - assertTrue(routingTable2.shardsWithState(ShardRoutingState.STARTED).stream().allMatch(ShardRouting::primary)); assertTrue(routingTable2.index("highPriority").allPrimaryShardsActive()); assertTrue(routingTable2.index("mediumPriority").allPrimaryShardsActive()); + assertThat(routingTable2.index("mediumPriority").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3)); + assertThat(routingTable2.index("mediumPriority").shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1)); assertTrue(routingTable2.index("lowPriority").allPrimaryShardsActive()); assertThat(routingTable2.index("invalid").shardsWithState(ShardRoutingState.STARTED), empty()); final ClusterState reroutedState3 = rerouteAndStartShards(allocationService, reroutedState2); final RoutingTable routingTable3 = reroutedState3.routingTable(); - // this reroute starts the two medium-priority replicas since their allocator permits this + + // this reroute starts the one remaining medium-priority replica assertThat(routingTable3.shardsWithState(ShardRoutingState.INITIALIZING), empty()); assertThat(routingTable3.shardsWithState(ShardRoutingState.RELOCATING), empty()); assertTrue(routingTable3.index("highPriority").allPrimaryShardsActive()); + assertTrue(routingTable3.index("mediumPriority").allPrimaryShardsActive()); assertThat(routingTable3.index("mediumPriority").shardsWithState(ShardRoutingState.UNASSIGNED), empty()); + assertThat(routingTable3.index("mediumPriority").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4)); assertTrue(routingTable3.index("lowPriority").allPrimaryShardsActive()); assertThat(routingTable3.index("invalid").shardsWithState(ShardRoutingState.STARTED), empty()); } diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/ThrottlingAllocationTests.java index dbadeeb630b..ecb0ebd01a4 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/ThrottlingAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -222,7 +222,8 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase { assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0)); assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(5)); assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(4)); - assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 5); + assertEquals(clusterState.getRoutingNodes().getInitialPrimariesIncomingRecoveries("node1"), 5); + assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0); logger.info("start initializing, all primaries should be started"); clusterState = startInitializingShardsAndReroute(strategy, clusterState);