Decouples primaries_recoveries limit from concurrent recoveries limit. (#546)
* Decouples initial primaries limit from node concurrent limits. Signed-off-by: Itiyama <itiyamas@amazon.com> * Checkstyle fixes. Signed-off-by: Itiyama <itiyamas@amazon.com> * Checkstyle test fixes. Signed-off-by: Itiyama <itiyamas@amazon.com> * Review comments Signed-off-by: Itiyama <itiyamas@amazon.com> * Fixes review comments. Signed-off-by: Itiyama <itiyamas@amazon.com>
This commit is contained in:
parent
6f893ed1cd
commit
c116062909
|
@ -63,6 +63,7 @@ import java.util.Map;
|
|||
import java.util.NoSuchElementException;
|
||||
import java.util.Queue;
|
||||
import java.util.Set;
|
||||
import java.util.function.Function;
|
||||
import java.util.function.Predicate;
|
||||
|
||||
/**
|
||||
|
@ -96,6 +97,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
|
||||
private final Map<String, ObjectIntHashMap<String>> nodesPerAttributeNames = new HashMap<>();
|
||||
private final Map<String, Recoveries> recoveriesPerNode = new HashMap<>();
|
||||
private final Map<String, Recoveries> primaryRecoveriesPerNode = new HashMap<>();
|
||||
|
||||
public RoutingNodes(ClusterState clusterState) {
|
||||
this(clusterState, true);
|
||||
|
@ -175,12 +177,20 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
}
|
||||
|
||||
private void updateRecoveryCounts(final ShardRouting routing, final boolean increment, @Nullable final ShardRouting primary) {
|
||||
|
||||
final int howMany = increment ? 1 : -1;
|
||||
assert routing.initializing() : "routing must be initializing: " + routing;
|
||||
// TODO: check primary == null || primary.active() after all tests properly add ReplicaAfterPrimaryActiveAllocationDecider
|
||||
assert primary == null || primary.assignedToNode() :
|
||||
"shard is initializing but its primary is not assigned to a node";
|
||||
|
||||
// Primary shard routing, excluding the relocating primaries.
|
||||
if(routing.primary() && (primary == null || primary == routing)) {
|
||||
assert routing.relocatingNodeId() == null: "Routing must be a non relocating primary";
|
||||
Recoveries.getOrAdd(primaryRecoveriesPerNode, routing.currentNodeId()).addIncoming(howMany);
|
||||
return;
|
||||
}
|
||||
|
||||
Recoveries.getOrAdd(recoveriesPerNode, routing.currentNodeId()).addIncoming(howMany);
|
||||
|
||||
if (routing.recoverySource().getType() == RecoverySource.Type.PEER) {
|
||||
|
@ -209,6 +219,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
return recoveriesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming();
|
||||
}
|
||||
|
||||
public int getInitialPrimariesIncomingRecoveries(String nodeId) {
|
||||
return primaryRecoveriesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming();
|
||||
}
|
||||
|
||||
public int getOutgoingRecoveries(String nodeId) {
|
||||
return recoveriesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getOutgoing();
|
||||
}
|
||||
|
@ -1092,30 +1106,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
}
|
||||
}
|
||||
|
||||
for (Map.Entry<String, Recoveries> recoveries : routingNodes.recoveriesPerNode.entrySet()) {
|
||||
String node = recoveries.getKey();
|
||||
final Recoveries value = recoveries.getValue();
|
||||
int incoming = 0;
|
||||
int outgoing = 0;
|
||||
RoutingNode routingNode = routingNodes.nodesToShards.get(node);
|
||||
if (routingNode != null) { // node might have dropped out of the cluster
|
||||
for (ShardRouting routing : routingNode) {
|
||||
if (routing.initializing()) {
|
||||
incoming++;
|
||||
}
|
||||
if (routing.primary() && routing.isRelocationTarget() == false) {
|
||||
for (ShardRouting assigned : routingNodes.assignedShards.get(routing.shardId())) {
|
||||
if (assigned.initializing() && assigned.recoverySource().getType() == RecoverySource.Type.PEER) {
|
||||
outgoing++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
assert incoming == value.incoming : incoming + " != " + value.incoming + " node: " + routingNode;
|
||||
assert outgoing == value.outgoing : outgoing + " != " + value.outgoing + " node: " + routingNode;
|
||||
}
|
||||
|
||||
assertRecoveriesPerNode(routingNodes, routingNodes.recoveriesPerNode, true,
|
||||
x -> !isNonRelocatingPrimary(x));
|
||||
assertRecoveriesPerNode(routingNodes, routingNodes.primaryRecoveriesPerNode, false,
|
||||
x -> isNonRelocatingPrimary(x));
|
||||
|
||||
assert unassignedPrimaryCount == routingNodes.unassignedShards.getNumPrimaries() :
|
||||
"Unassigned primaries is [" + unassignedPrimaryCount + "] but RoutingNodes returned unassigned primaries [" +
|
||||
|
@ -1135,6 +1129,39 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
return true;
|
||||
}
|
||||
|
||||
private static void assertRecoveriesPerNode(RoutingNodes routingNodes, Map<String, Recoveries> recoveriesPerNode,
|
||||
boolean verifyOutgoingRecoveries,
|
||||
Function<ShardRouting, Boolean> incomingCountFilter) {
|
||||
for (Map.Entry<String, Recoveries> recoveries : recoveriesPerNode.entrySet()) {
|
||||
String node = recoveries.getKey();
|
||||
final Recoveries value = recoveries.getValue();
|
||||
int incoming = 0;
|
||||
int outgoing = 0;
|
||||
RoutingNode routingNode = routingNodes.nodesToShards.get(node);
|
||||
if (routingNode != null) { // node might have dropped out of the cluster
|
||||
for (ShardRouting routing : routingNode) {
|
||||
if (routing.initializing() && incomingCountFilter.apply(routing))
|
||||
incoming++;
|
||||
|
||||
if (verifyOutgoingRecoveries && routing.primary() && routing.isRelocationTarget() == false) {
|
||||
for (ShardRouting assigned : routingNodes.assignedShards.get(routing.shardId())) {
|
||||
if (assigned.initializing() && assigned.recoverySource().getType() == RecoverySource.Type.PEER) {
|
||||
outgoing++;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
assert incoming == value.incoming : incoming + " != " + value.incoming + " node: " + routingNode;
|
||||
assert outgoing == value.outgoing : outgoing + " != " + value.outgoing + " node: " + routingNode;
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean isNonRelocatingPrimary(ShardRouting routing) {
|
||||
return routing.primary() && routing.relocatingNodeId() == null;
|
||||
}
|
||||
|
||||
private void ensureMutable() {
|
||||
if (readOnly) {
|
||||
throw new IllegalStateException("can't modify RoutingNodes - readonly");
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.opensearch.cluster.routing.RoutingNode;
|
|||
import org.opensearch.cluster.routing.ShardRouting;
|
||||
import org.opensearch.cluster.routing.UnassignedInfo;
|
||||
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
|
||||
import org.opensearch.cluster.routing.ShardRoutingState;
|
||||
import org.opensearch.common.settings.ClusterSettings;
|
||||
import org.opensearch.common.settings.Setting;
|
||||
import org.opensearch.common.settings.Setting.Property;
|
||||
|
@ -58,7 +57,8 @@ import static org.opensearch.cluster.routing.allocation.decider.Decision.YES;
|
|||
* node. The default is {@code 4}</li>
|
||||
* <li>{@code cluster.routing.allocation.node_concurrent_recoveries} -
|
||||
* restricts the number of total concurrent shards initializing on a single node. The
|
||||
* default is {@code 2}</li>
|
||||
* default is {@code 2}. Please note that this limit excludes the initial primaries
|
||||
* recovery operations per node.</li>
|
||||
* </ul>
|
||||
* <p>
|
||||
* If one of the above thresholds is exceeded per node this allocation decider
|
||||
|
@ -135,14 +135,8 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
|
|||
// primary is unassigned, means we are going to do recovery from store, snapshot or local shards
|
||||
// count *just the primaries* currently doing recovery on the node and check against primariesInitialRecoveries
|
||||
|
||||
int primariesInRecovery = 0;
|
||||
for (ShardRouting shard : node.shardsWithState(ShardRoutingState.INITIALIZING)) {
|
||||
// when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
|
||||
// we only count initial recoveries here, so we need to make sure that relocating node is null
|
||||
if (shard.primary() && shard.relocatingNodeId() == null) {
|
||||
primariesInRecovery++;
|
||||
}
|
||||
}
|
||||
int primariesInRecovery = allocation.routingNodes().getInitialPrimariesIncomingRecoveries(node.nodeId());
|
||||
|
||||
if (primariesInRecovery >= primariesInitialRecoveries) {
|
||||
// TODO: Should index creation not be throttled for primary shards?
|
||||
return allocation.decision(THROTTLE, NAME,
|
||||
|
|
|
@ -42,6 +42,9 @@ import org.opensearch.cluster.routing.allocation.AllocationService;
|
|||
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
|
||||
import org.opensearch.common.settings.Settings;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
|
||||
|
||||
public class AllocationPriorityTests extends OpenSearchAllocationTestCase {
|
||||
|
@ -95,18 +98,15 @@ public class AllocationPriorityTests extends OpenSearchAllocationTestCase {
|
|||
assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).getIndexName());
|
||||
|
||||
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
|
||||
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
|
||||
assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).getIndexName());
|
||||
assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).getIndexName());
|
||||
assertEquals(4, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
|
||||
List<String> indices = clusterState.getRoutingNodes().shardsWithState(INITIALIZING).stream().
|
||||
map(x->x.getIndexName()).collect(Collectors.toList());
|
||||
assertTrue(indices.contains(lowPriorityName));
|
||||
assertTrue(indices.contains(highPriorityName));
|
||||
|
||||
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
|
||||
assertEquals(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).toString(),2,
|
||||
clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
|
||||
assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).getIndexName());
|
||||
assertEquals(highPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).getIndexName());
|
||||
|
||||
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
|
||||
assertEquals(2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size());
|
||||
assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).getIndexName());
|
||||
assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).getIndexName());
|
||||
|
||||
|
|
|
@ -213,6 +213,7 @@ public class AllocationServiceTests extends OpenSearchTestCase {
|
|||
final RoutingTable routingTable1 = reroutedState1.routingTable();
|
||||
// the test harness only permits one recovery per node, so we must have allocated all the high-priority primaries and one of the
|
||||
// medium-priority ones
|
||||
|
||||
assertThat(routingTable1.shardsWithState(ShardRoutingState.INITIALIZING), empty());
|
||||
assertThat(routingTable1.shardsWithState(ShardRoutingState.RELOCATING), empty());
|
||||
assertTrue(routingTable1.shardsWithState(ShardRoutingState.STARTED).stream().allMatch(ShardRouting::primary));
|
||||
|
@ -223,22 +224,28 @@ public class AllocationServiceTests extends OpenSearchTestCase {
|
|||
|
||||
final ClusterState reroutedState2 = rerouteAndStartShards(allocationService, reroutedState1);
|
||||
final RoutingTable routingTable2 = reroutedState2.routingTable();
|
||||
// this reroute starts the one remaining medium-priority primary and both of the low-priority ones, but no replicas
|
||||
|
||||
// this reroute starts the one remaining medium-priority primary and both of the low-priority ones,
|
||||
// and also 1 medium priority replica
|
||||
assertThat(routingTable2.shardsWithState(ShardRoutingState.INITIALIZING), empty());
|
||||
assertThat(routingTable2.shardsWithState(ShardRoutingState.RELOCATING), empty());
|
||||
assertTrue(routingTable2.shardsWithState(ShardRoutingState.STARTED).stream().allMatch(ShardRouting::primary));
|
||||
assertTrue(routingTable2.index("highPriority").allPrimaryShardsActive());
|
||||
assertTrue(routingTable2.index("mediumPriority").allPrimaryShardsActive());
|
||||
assertThat(routingTable2.index("mediumPriority").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(3));
|
||||
assertThat(routingTable2.index("mediumPriority").shardsWithState(ShardRoutingState.UNASSIGNED).size(), equalTo(1));
|
||||
assertTrue(routingTable2.index("lowPriority").allPrimaryShardsActive());
|
||||
assertThat(routingTable2.index("invalid").shardsWithState(ShardRoutingState.STARTED), empty());
|
||||
|
||||
final ClusterState reroutedState3 = rerouteAndStartShards(allocationService, reroutedState2);
|
||||
final RoutingTable routingTable3 = reroutedState3.routingTable();
|
||||
// this reroute starts the two medium-priority replicas since their allocator permits this
|
||||
|
||||
// this reroute starts the one remaining medium-priority replica
|
||||
assertThat(routingTable3.shardsWithState(ShardRoutingState.INITIALIZING), empty());
|
||||
assertThat(routingTable3.shardsWithState(ShardRoutingState.RELOCATING), empty());
|
||||
assertTrue(routingTable3.index("highPriority").allPrimaryShardsActive());
|
||||
assertTrue(routingTable3.index("mediumPriority").allPrimaryShardsActive());
|
||||
assertThat(routingTable3.index("mediumPriority").shardsWithState(ShardRoutingState.UNASSIGNED), empty());
|
||||
assertThat(routingTable3.index("mediumPriority").shardsWithState(ShardRoutingState.STARTED).size(), equalTo(4));
|
||||
assertTrue(routingTable3.index("lowPriority").allPrimaryShardsActive());
|
||||
assertThat(routingTable3.index("invalid").shardsWithState(ShardRoutingState.STARTED), empty());
|
||||
}
|
||||
|
|
|
@ -222,7 +222,8 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase {
|
|||
assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0));
|
||||
assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(5));
|
||||
assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(4));
|
||||
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 5);
|
||||
assertEquals(clusterState.getRoutingNodes().getInitialPrimariesIncomingRecoveries("node1"), 5);
|
||||
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
|
||||
|
||||
logger.info("start initializing, all primaries should be started");
|
||||
clusterState = startInitializingShardsAndReroute(strategy, clusterState);
|
||||
|
|
Loading…
Reference in New Issue