From 11babe77822f6c997080cf5b9774096498a515f9 Mon Sep 17 00:00:00 2001 From: itiyamas <45985097+itiyamas@users.noreply.github.com> Date: Thu, 17 Jun 2021 10:58:50 +0530 Subject: [PATCH] Decouple throttling limits for new and old indices. (#778) * Decouple throttling limits for new and old indices. Signed-off-by: Itiyama * Precommit fixes. Signed-off-by: Itiyama * Review comments. Signed-off-by: Itiyama * Review comments and test fix. Signed-off-by: Itiyama * Checkstyle fixes. Signed-off-by: Itiyama * Review comments. Signed-off-by: Itiyama --- .../cluster/routing/RoutingNodes.java | 66 +++- .../cluster/routing/ShardRouting.java | 7 + .../decider/ThrottlingAllocationDecider.java | 122 ++++-- .../common/settings/ClusterSettings.java | 1 + .../cluster/routing/RoutingTableTests.java | 3 + .../allocation/AllocationPriorityTests.java | 3 +- .../allocation/AllocationServiceTests.java | 3 + .../allocation/AwarenessAllocationTests.java | 12 + .../ConcurrentRebalanceRoutingTests.java | 2 + .../routing/allocation/IndexBalanceTests.java | 4 + .../NodeVersionAllocationDeciderTests.java | 3 + .../PreferPrimaryAllocationTests.java | 1 + ...yNotRelocatedWhileBeingRecoveredTests.java | 2 + .../allocation/RebalanceAfterActiveTests.java | 2 + .../RoutingNodesIntegrityTests.java | 4 + .../TenShardsOneReplicaRoutingTests.java | 2 + .../allocation/ThrottlingAllocationTests.java | 363 +++++++++++++++++- 17 files changed, 549 insertions(+), 51 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java index e15bc838b4a..ddaf8782805 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RoutingNodes.java @@ -97,7 +97,8 @@ public class RoutingNodes implements Iterable { private final Map> nodesPerAttributeNames = new HashMap<>(); private final Map recoveriesPerNode = new HashMap<>(); - private final Map primaryRecoveriesPerNode = new HashMap<>(); + private final Map initialReplicaRecoveries = new HashMap<>(); + private final Map initialPrimaryRecoveries = new HashMap<>(); public RoutingNodes(ClusterState clusterState) { this(clusterState, true); @@ -187,46 +188,69 @@ public class RoutingNodes implements Iterable { // Primary shard routing, excluding the relocating primaries. if(routing.primary() && (primary == null || primary == routing)) { assert routing.relocatingNodeId() == null: "Routing must be a non relocating primary"; - Recoveries.getOrAdd(primaryRecoveriesPerNode, routing.currentNodeId()).addIncoming(howMany); + Recoveries.getOrAdd(initialPrimaryRecoveries, routing.currentNodeId()).addIncoming(howMany); return; } - Recoveries.getOrAdd(recoveriesPerNode, routing.currentNodeId()).addIncoming(howMany); + Recoveries.getOrAdd(getRecoveries(routing), routing.currentNodeId()).addIncoming(howMany); if (routing.recoverySource().getType() == RecoverySource.Type.PEER) { // add/remove corresponding outgoing recovery on node with primary shard if (primary == null) { throw new IllegalStateException("shard is peer recovering but primary is unassigned"); } - Recoveries.getOrAdd(recoveriesPerNode, primary.currentNodeId()).addOutgoing(howMany); + + Recoveries.getOrAdd(getRecoveries(routing), primary.currentNodeId()).addOutgoing(howMany); if (increment == false && routing.primary() && routing.relocatingNodeId() != null) { // primary is done relocating, move non-primary recoveries from old primary to new primary - int numRecoveringReplicas = 0; for (ShardRouting assigned : assignedShards(routing.shardId())) { if (assigned.primary() == false && assigned.initializing() && assigned.recoverySource().getType() == RecoverySource.Type.PEER) { - numRecoveringReplicas++; + Map recoveriesToUpdate = getRecoveries(assigned); + Recoveries.getOrAdd(recoveriesToUpdate, routing.relocatingNodeId()).addOutgoing(-1); + Recoveries.getOrAdd(recoveriesToUpdate, routing.currentNodeId()).addOutgoing(1); } } - recoveriesPerNode.get(routing.relocatingNodeId()).addOutgoing(-numRecoveringReplicas); - recoveriesPerNode.get(routing.currentNodeId()).addOutgoing(numRecoveringReplicas); + } } } + private Map getRecoveries(ShardRouting routing) { + if(routing.unassignedReasonIndexCreated() && !routing.primary()) { + return initialReplicaRecoveries; + } else { + return recoveriesPerNode; + } + } + public int getIncomingRecoveries(String nodeId) { return recoveriesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming(); } public int getInitialPrimariesIncomingRecoveries(String nodeId) { - return primaryRecoveriesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming(); + return initialPrimaryRecoveries.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming(); } public int getOutgoingRecoveries(String nodeId) { return recoveriesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getOutgoing(); } + /** + * Recoveries started on node as a result of new index creation. + */ + public int getInitialIncomingRecoveries(String nodeId) { + return initialReplicaRecoveries.getOrDefault(nodeId, Recoveries.EMPTY).getIncoming(); + } + + /** + * Recoveries started from node as a result of new index creation. + */ + public int getInitialOutgoingRecoveries(String nodeId) { + return initialReplicaRecoveries.getOrDefault(nodeId, Recoveries.EMPTY).getOutgoing(); + } + @Nullable private ShardRouting findAssignedPrimaryIfPeerRecovery(ShardRouting routing) { ShardRouting primary = null; @@ -1106,10 +1130,10 @@ public class RoutingNodes implements Iterable { } } - assertRecoveriesPerNode(routingNodes, routingNodes.recoveriesPerNode, true, - x -> !isNonRelocatingPrimary(x)); - assertRecoveriesPerNode(routingNodes, routingNodes.primaryRecoveriesPerNode, false, + assertRecoveriesPerNode(routingNodes, routingNodes.initialPrimaryRecoveries, false, x -> isNonRelocatingPrimary(x)); + assertRecoveriesPerNode(routingNodes, Recoveries.unionRecoveries(routingNodes.recoveriesPerNode, + routingNodes.initialReplicaRecoveries), true, x -> !isNonRelocatingPrimary(x)); assert unassignedPrimaryCount == routingNodes.unassignedShards.getNumPrimaries() : "Unassigned primaries is [" + unassignedPrimaryCount + "] but RoutingNodes returned unassigned primaries [" + @@ -1237,5 +1261,23 @@ public class RoutingNodes implements Iterable { } return recoveries; } + + // used only for tests + static Map unionRecoveries(Map first, Map second) { + Map recoveries = new HashMap<>(); + addRecoveries(recoveries, first); + addRecoveries(recoveries, second); + return recoveries; + } + + private static void addRecoveries(Map existingRecoveries, + Map newRecoveries) { + for (String node : newRecoveries.keySet()) { + Recoveries r2 = newRecoveries.get(node); + Recoveries r1 = Recoveries.getOrAdd(existingRecoveries, node); + r1.addIncoming(r2.incoming); + r1.addOutgoing(r2.outgoing); + } + } } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java index cf01f00d3bf..09a8678ceda 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/ShardRouting.java @@ -683,4 +683,11 @@ public final class ShardRouting implements Writeable, ToXContentObject { public RecoverySource recoverySource() { return recoverySource; } + + public boolean unassignedReasonIndexCreated() { + if (unassignedInfo != null) { + return unassignedInfo.getReason() == UnassignedInfo.Reason.INDEX_CREATED; + } + return false; + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java index d8df077eadd..928dd972a42 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/decider/ThrottlingAllocationDecider.java @@ -44,6 +44,9 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import java.util.Locale; +import java.util.function.BiFunction; + import static org.opensearch.cluster.routing.allocation.decider.Decision.THROTTLE; import static org.opensearch.cluster.routing.allocation.decider.Decision.YES; @@ -71,7 +74,7 @@ public class ThrottlingAllocationDecider extends AllocationDecider { private static final Logger logger = LogManager.getLogger(ThrottlingAllocationDecider.class); public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES = 2; - public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES = 4; + public static final int DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_RECOVERIES = 4; public static final String NAME = "throttling"; public static final Setting CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING = new Setting<>("cluster.routing.allocation.node_concurrent_recoveries", @@ -80,7 +83,11 @@ public class ThrottlingAllocationDecider extends AllocationDecider { Property.Dynamic, Property.NodeScope); public static final Setting CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING = Setting.intSetting("cluster.routing.allocation.node_initial_primaries_recoveries", - DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES, 0, + DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_RECOVERIES, 0, + Property.Dynamic, Property.NodeScope); + public static final Setting CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING = + Setting.intSetting("cluster.routing.allocation.node_initial_replicas_recoveries", + DEFAULT_CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_RECOVERIES, 0, Property.Dynamic, Property.NodeScope); public static final Setting CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING = new Setting<>( "cluster.routing.allocation.node_concurrent_incoming_recoveries", @@ -99,9 +106,11 @@ public class ThrottlingAllocationDecider extends AllocationDecider { private volatile int primariesInitialRecoveries; private volatile int concurrentIncomingRecoveries; private volatile int concurrentOutgoingRecoveries; + private volatile int replicasInitialRecoveries; public ThrottlingAllocationDecider(Settings settings, ClusterSettings clusterSettings) { - this.primariesInitialRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.get(settings); + primariesInitialRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.get(settings); + replicasInitialRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.get(settings); concurrentIncomingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.get(settings); concurrentOutgoingRecoveries = CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.get(settings); @@ -111,10 +120,12 @@ public class ThrottlingAllocationDecider extends AllocationDecider { this::setConcurrentIncomingRecoverries); clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, this::setConcurrentOutgoingRecoverries); + clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING, + this::setReplicasInitialRecoveries); logger.debug("using node_concurrent_outgoing_recoveries [{}], node_concurrent_incoming_recoveries [{}], " + - "node_initial_primaries_recoveries [{}]", - concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries); + "node_initial_primaries_recoveries [{}], node_initial_replicas_recoveries [{}]", + concurrentOutgoingRecoveries, concurrentIncomingRecoveries, primariesInitialRecoveries, replicasInitialRecoveries); } private void setConcurrentIncomingRecoverries(int concurrentIncomingRecoveries) { @@ -128,6 +139,10 @@ public class ThrottlingAllocationDecider extends AllocationDecider { this.primariesInitialRecoveries = primariesInitialRecoveries; } + private void setReplicasInitialRecoveries(int replicasInitialRecoveries) { + this.replicasInitialRecoveries = replicasInitialRecoveries; + } + @Override public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { if (shardRouting.primary() && shardRouting.unassigned()) { @@ -150,36 +165,75 @@ public class ThrottlingAllocationDecider extends AllocationDecider { // Peer recovery assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER; - // Allocating a shard to this node will increase the incoming recoveries - int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId()); - if (currentInRecoveries >= concurrentIncomingRecoveries) { - return allocation.decision(THROTTLE, NAME, - "reached the limit of incoming shard recoveries [%d], cluster setting [%s=%d] (can also be set via [%s])", - currentInRecoveries, CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), - concurrentIncomingRecoveries, - CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey()); + if(shardRouting.unassignedReasonIndexCreated()) { + return allocateInitialShardCopies(shardRouting, node, allocation); } else { - // search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node - ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId()); - if (primaryShard == null) { - return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active"); - } - int primaryNodeOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId()); - if (primaryNodeOutRecoveries >= concurrentOutgoingRecoveries) { - return allocation.decision(THROTTLE, NAME, - "reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, " + - "cluster setting [%s=%d] (can also be set via [%s])", - primaryNodeOutRecoveries, primaryShard.currentNodeId(), - CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), - concurrentOutgoingRecoveries, - CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey()); - } else { - return allocation.decision(YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]", - primaryNodeOutRecoveries, - concurrentOutgoingRecoveries, - currentInRecoveries, - concurrentIncomingRecoveries); - } + return allocateNonInitialShardCopies(shardRouting, node, allocation); + } + } + } + + private Decision allocateInitialShardCopies(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + int currentInRecoveries = allocation.routingNodes().getInitialIncomingRecoveries(node.nodeId()); + assert shardRouting.unassignedReasonIndexCreated() && !shardRouting.primary(); + + return allocateShardCopies(shardRouting, allocation, currentInRecoveries, replicasInitialRecoveries, + (x,y) -> getInitialPrimaryNodeOutgoingRecoveries(x,y), replicasInitialRecoveries, + String.format(Locale.ROOT, "[%s=%d]", CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), + replicasInitialRecoveries), + String.format(Locale.ROOT, "[%s=%d]", CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), + replicasInitialRecoveries)); + } + + private Decision allocateNonInitialShardCopies(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + + assert !shardRouting.unassignedReasonIndexCreated(); + int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId()); + + return allocateShardCopies(shardRouting, allocation, currentInRecoveries, concurrentIncomingRecoveries, + (x,y) -> getPrimaryNodeOutgoingRecoveries(x,y), concurrentOutgoingRecoveries, + String.format(Locale.ROOT, "[%s=%d] (can also be set via [%s])", + CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), + concurrentIncomingRecoveries, CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey()), + String.format(Locale.ROOT, "[%s=%d] (can also be set via [%s])", + CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), + concurrentOutgoingRecoveries, CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey())); + } + + private Integer getPrimaryNodeOutgoingRecoveries(ShardRouting shardRouting, RoutingAllocation allocation) { + ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId()); + return allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId()); + } + + private Integer getInitialPrimaryNodeOutgoingRecoveries(ShardRouting shardRouting, RoutingAllocation allocation) { + ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId()); + return allocation.routingNodes().getInitialOutgoingRecoveries(primaryShard.currentNodeId()); + } + + private Decision allocateShardCopies(ShardRouting shardRouting, RoutingAllocation allocation, int currentInRecoveries, + int inRecoveriesLimit, BiFunction primaryNodeOutRecoveriesFunc, int outRecoveriesLimit, + String incomingRecoveriesSettingMsg, String outGoingRecoveriesSettingMsg) { + // Allocating a shard to this node will increase the incoming recoveries + if (currentInRecoveries >= inRecoveriesLimit) { + return allocation.decision(THROTTLE, NAME, + "reached the limit of incoming shard recoveries [%d], cluster setting %s", + currentInRecoveries, incomingRecoveriesSettingMsg); + } else { + // search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node + ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId()); + if (primaryShard == null) { + return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active"); + } + int primaryNodeOutRecoveries = primaryNodeOutRecoveriesFunc.apply(shardRouting, allocation); + if (primaryNodeOutRecoveries >= outRecoveriesLimit) { + return allocation.decision(THROTTLE, NAME, + "reached the limit of outgoing shard recoveries [%d] on the node [%s] which holds the primary, " + + "cluster setting %s", primaryNodeOutRecoveries, primaryShard.currentNodeId(), + outGoingRecoveriesSettingMsg); + } else { + return allocation.decision(YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]", + primaryNodeOutRecoveries, outRecoveriesLimit, currentInRecoveries, inRecoveriesLimit); } } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index af78113335d..5445c46e9fb 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -256,6 +256,7 @@ public final class ClusterSettings extends AbstractScopedSettings { RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_FILE_CHUNKS_SETTING, RecoverySettings.INDICES_RECOVERY_MAX_CONCURRENT_OPERATIONS_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING, + ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING, ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING, diff --git a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java index 7225892e26b..559c67be4b7 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/RoutingTableTests.java @@ -41,6 +41,7 @@ import org.opensearch.cluster.metadata.MetadataIndexStateService; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.node.DiscoveryNodes.Builder; import org.opensearch.cluster.routing.allocation.AllocationService; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; import org.opensearch.index.Index; import org.opensearch.index.IndexNotFoundException; @@ -72,6 +73,8 @@ public class RoutingTableTests extends OpenSearchAllocationTestCase { private final AllocationService ALLOCATION_SERVICE = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", Integer.MAX_VALUE) // don't limit recoveries .put("cluster.routing.allocation.node_initial_primaries_recoveries", Integer.MAX_VALUE) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), + Integer.MAX_VALUE) .build()); private ClusterState clusterState; diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationPriorityTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationPriorityTests.java index 15ea5bb8573..225e31bcc2c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationPriorityTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationPriorityTests.java @@ -38,7 +38,6 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RoutingTable; -import org.opensearch.cluster.routing.allocation.AllocationService; import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; @@ -57,6 +56,7 @@ public class AllocationPriorityTests extends OpenSearchAllocationTestCase { AllocationService allocation = createAllocationService(Settings.builder(). put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 1) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1) .build()); @@ -107,6 +107,7 @@ public class AllocationPriorityTests extends OpenSearchAllocationTestCase { clusterState = startInitializingShardsAndReroute(allocation, clusterState); assertEquals(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).toString(),2, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size()); + assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(0).getIndexName()); assertEquals(lowPriorityName, clusterState.getRoutingNodes().shardsWithState(INITIALIZING).get(1).getIndexName()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java index 187e6fb26cb..94390559626 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AllocationServiceTests.java @@ -80,6 +80,8 @@ import static org.opensearch.cluster.routing.UnassignedInfo.AllocationStatus.DEC import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING; import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING; import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING; +import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING; + import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -137,6 +139,7 @@ public class AllocationServiceTests extends OpenSearchTestCase { // the effects of that depend on the earlier (random) allocations final Settings settings = Settings.builder() .put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 1) + .put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 1) .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(), 1) .put(CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), Integer.MAX_VALUE) .build(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java index 5490c7e2e66..fb280009077 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/AwarenessAllocationTests.java @@ -48,6 +48,7 @@ import org.opensearch.cluster.routing.allocation.command.CancelAllocationCommand import org.opensearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; import java.util.HashMap; @@ -69,6 +70,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { public void testMoveShardOnceNewNodeWithAttributeAdded1() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); @@ -131,6 +133,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { public void testMoveShardOnceNewNodeWithAttributeAdded2() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); @@ -195,6 +198,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.awareness.attributes", "rack_id") @@ -281,6 +285,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.awareness.attributes", "rack_id") @@ -369,6 +374,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { public void testMoveShardOnceNewNodeWithAttributeAdded5() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); @@ -443,6 +449,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { public void testMoveShardOnceNewNodeWithAttributeAdded6() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "rack_id") .build()); @@ -519,6 +526,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { public void testFullAwareness1() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2") .put("cluster.routing.allocation.awareness.attributes", "rack_id") @@ -583,6 +591,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { public void testFullAwareness2() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2") .put("cluster.routing.allocation.awareness.attributes", "rack_id") @@ -649,6 +658,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.awareness.force.rack_id.values", "1,2") @@ -729,6 +739,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { .put("cluster.routing.allocation.awareness.force.zone.values", "a,b") .put("cluster.routing.allocation.awareness.attributes", "zone") .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) @@ -788,6 +799,7 @@ public class AwarenessAllocationTests extends OpenSearchAllocationTestCase { public void testUnassignedShardsWithUnbalancedZones() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.awareness.attributes", "zone") .build()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java index 3b02266c8fd..db1ac9e94d4 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/ConcurrentRebalanceRoutingTests.java @@ -41,6 +41,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -56,6 +57,7 @@ public class ConcurrentRebalanceRoutingTests extends OpenSearchAllocationTestCas public void testClusterConcurrentRebalance() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put("cluster.routing.allocation.cluster_concurrent_rebalance", 3) .build()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexBalanceTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexBalanceTests.java index d03332f58e1..97ba67a0703 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexBalanceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/IndexBalanceTests.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -59,6 +60,7 @@ public class IndexBalanceTests extends OpenSearchAllocationTestCase { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build()); @@ -177,6 +179,7 @@ public class IndexBalanceTests extends OpenSearchAllocationTestCase { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build()); @@ -320,6 +323,7 @@ public class IndexBalanceTests extends OpenSearchAllocationTestCase { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java index 6812982032c..ec18162451a 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -61,6 +61,7 @@ import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocat import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.NodeVersionAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ReplicaAfterPrimaryActiveAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.UUIDs; import org.opensearch.common.collect.ImmutableOpenMap; import org.opensearch.common.settings.Settings; @@ -99,6 +100,7 @@ public class NodeVersionAllocationDeciderTests extends OpenSearchAllocationTestC public void testDoNotAllocateFromPrimary() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); @@ -195,6 +197,7 @@ public class NodeVersionAllocationDeciderTests extends OpenSearchAllocationTestC public void testRandom() { AllocationService service = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/PreferPrimaryAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/PreferPrimaryAllocationTests.java index f3811ebe3bb..549824ba992 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/PreferPrimaryAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/PreferPrimaryAllocationTests.java @@ -54,6 +54,7 @@ public class PreferPrimaryAllocationTests extends OpenSearchAllocationTestCase { logger.info("create an allocation with 1 initial recoveries"); AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 1) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 1) .build()); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java index 285b8ab37e6..00533698380 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/PrimaryNotRelocatedWhileBeingRecoveredTests.java @@ -42,6 +42,7 @@ import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -56,6 +57,7 @@ public class PrimaryNotRelocatedWhileBeingRecoveredTests extends OpenSearchAlloc .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.concurrent_source_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .build()); logger.info("Building initial routing table"); diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RebalanceAfterActiveTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RebalanceAfterActiveTests.java index 3567130bdaa..7c61be39074 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RebalanceAfterActiveTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RebalanceAfterActiveTests.java @@ -46,6 +46,7 @@ import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -66,6 +67,7 @@ public class RebalanceAfterActiveTests extends OpenSearchAllocationTestCase { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .build(), diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java index f083d176860..6ae907a314e 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/RoutingNodesIntegrityTests.java @@ -51,6 +51,7 @@ import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.opensearch.cluster.routing.ShardRoutingState.STARTED; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; +import static org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING; public class RoutingNodesIntegrityTests extends OpenSearchAllocationTestCase { private final Logger logger = LogManager.getLogger(IndexBalanceTests.class); @@ -58,6 +59,7 @@ public class RoutingNodesIntegrityTests extends OpenSearchAllocationTestCase { public void testBalanceAllNodesStarted() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build()); @@ -118,6 +120,7 @@ public class RoutingNodesIntegrityTests extends OpenSearchAllocationTestCase { public void testBalanceIncrementallyStartNodes() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) + .put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1).build()); @@ -194,6 +197,7 @@ public class RoutingNodesIntegrityTests extends OpenSearchAllocationTestCase { public void testBalanceAllNodesStartedAddIndex() { AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 1) + .put(CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 1) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java index ef3c5f5aa4a..87dba6df5b4 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/TenShardsOneReplicaRoutingTests.java @@ -43,6 +43,7 @@ import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.RoutingNodes; import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; +import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider; import org.opensearch.common.settings.Settings; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -61,6 +62,7 @@ public class TenShardsOneReplicaRoutingTests extends OpenSearchAllocationTestCas AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 10) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 10) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 10) .put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING.getKey(), "always") .put("cluster.routing.allocation.cluster_concurrent_rebalance", -1) .put("cluster.routing.allocation.balance.index", 0.0f) diff --git a/server/src/test/java/org/opensearch/cluster/routing/allocation/ThrottlingAllocationTests.java b/server/src/test/java/org/opensearch/cluster/routing/allocation/ThrottlingAllocationTests.java index ecb0ebd01a4..02c6b68fe7c 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/allocation/ThrottlingAllocationTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/allocation/ThrottlingAllocationTests.java @@ -44,6 +44,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.cluster.routing.RecoverySource.SnapshotRecoverySource; import org.opensearch.cluster.routing.RoutingTable; @@ -71,6 +72,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Set; +import java.util.stream.Collectors; import static org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING; import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING; @@ -83,12 +85,13 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase { private final Logger logger = LogManager.getLogger(ThrottlingAllocationTests.class); public void testPrimaryRecoveryThrottling() { - TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); TestSnapshotsInfoService snapshotsInfoService = new TestSnapshotsInfoService(); AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_recoveries", 3) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), + randomIntBetween(3,10)) .build(), gatewayAllocator, snapshotsInfoService); logger.info("Building initial routing table"); @@ -143,6 +146,7 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 3) .put("cluster.routing.allocation.concurrent_source_recoveries", 3) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 3) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 3) .build(), gatewayAllocator, snapshotsInfoService); @@ -207,6 +211,7 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase { .put("cluster.routing.allocation.node_concurrent_recoveries", 5) .put("cluster.routing.allocation.node_initial_primaries_recoveries", 5) .put("cluster.routing.allocation.cluster_concurrent_rebalance", 5) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 5) .build(); AllocationService strategy = createAllocationService(settings, gatewayAllocator, snapshotsInfoService); logger.info("Building initial routing table"); @@ -261,11 +266,12 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase { assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1); } - public void testOutgoingThrottlesAllocation() { + public void testOutgoingThrottlesAllocationOldIndex() { TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); TestSnapshotsInfoService snapshotsInfoService = new TestSnapshotsInfoService(); AllocationService strategy = createAllocationService(Settings.builder() .put("cluster.routing.allocation.node_concurrent_outgoing_recoveries", 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 1) .build(), gatewayAllocator, snapshotsInfoService); logger.info("Building initial routing table"); @@ -274,7 +280,7 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase { .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) .build(); - ClusterState clusterState = createRecoveryStateAndInitializeAllocations(metadata, gatewayAllocator, snapshotsInfoService); + ClusterState clusterState = createRecoveryStateAndInitializeAllocations(metadata, gatewayAllocator, snapshotsInfoService, 1); logger.info("with one node, do reroute, only 1 should initialize"); clusterState = strategy.reroute(clusterState, "reroute"); @@ -299,6 +305,7 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase { assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(1)); assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(1)); assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1); + assertEquals(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node1"), 0); logger.info("start initializing non-primary"); clusterState = startInitializingShardsAndReroute(strategy, clusterState); @@ -316,6 +323,7 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase { assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(1)); assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(0)); assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1); + assertEquals(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node1"), 0); logger.info("start one more node"); clusterState = ClusterState.builder(clusterState) @@ -352,10 +360,356 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase { assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0); } + public void testOutgoingThrottlesAllocationNewIndex() { + TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); + TestSnapshotsInfoService snapshotsInfoService = new TestSnapshotsInfoService(); + AllocationService strategy = createAllocationService(Settings.builder() + .put("cluster.routing.allocation.node_concurrent_outgoing_recoveries", 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 1) + .build(), gatewayAllocator, snapshotsInfoService); + + logger.info("Building initial routing table"); + + Metadata metadata = Metadata.builder() + .put(IndexMetadata.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2)) + .build(); + + ClusterState clusterState = createRecoveryStateAndInitializeAllocations(metadata, gatewayAllocator, snapshotsInfoService, 5); + + logger.info("with one node, do reroute, only 1 should initialize"); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(0)); + assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(2)); + + logger.info("start initializing"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + + assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(0)); + assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(2)); + + logger.info("start one more node, first non-primary should start being allocated"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node2"))).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(1)); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node1"), 1); + + logger.info("start initializing non-primary"); + clusterState = startInitializingShardsAndReroute(strategy, clusterState); + assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(0)); + assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(1)); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0); + + logger.info("start one more node, initializing second non-primary"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(2)); + assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(1)); + assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(0)); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node1"), 1); + + logger.info("start one more node"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4"))).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0); + assertEquals(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node1"), 1); + + logger.info("move started non-primary to new node"); + AllocationService.CommandsResult commandsResult = strategy.reroute(clusterState, new AllocationCommands( + new MoveAllocationCommand("test", 0, "node2", "node4")), true, false); + assertEquals(commandsResult.explanations().explanations().size(), 1); + assertEquals(commandsResult.explanations().explanations().get(0).decisions().type(), Decision.Type.YES); + + clusterState = commandsResult.getClusterState(); + assertThat(clusterState.routingTable().shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingTable().shardsWithState(RELOCATING).size(), equalTo(1)); + assertThat(clusterState.routingTable().shardsWithState(INITIALIZING).size(), equalTo(2)); + assertThat(clusterState.routingTable().shardsWithState(UNASSIGNED).size(), equalTo(0)); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1); + assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0); + assertEquals(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node1"), 1); + assertEquals(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node2"), 0); + } + + public void testNewIndexReplicaAllocationIncomingAndOutgoingLimitBreached() { + int primaries = randomIntBetween(5, 30); + int replicas = randomIntBetween(1, 2); + int replicasConcurrentRecoveries = randomIntBetween(1, 2); + int newIndexInitializingAfterReroute = replicasConcurrentRecoveries * 2; + verifyNewIndexReplicaAllocation(primaries, replicas, newIndexInitializingAfterReroute, + primaries * replicas - newIndexInitializingAfterReroute, + replicasConcurrentRecoveries, false); + } + + public void testNewIndexReplicaAllocationOutgoingLimitBreachedIncomingNotBreached() { + verifyNewIndexReplicaAllocation(5, 1,2, 3, 1, true); + } + + public void testNewIndexReplicaAllocationLimitNotBreached() { + int primaries = randomIntBetween(5, 30); + int newIndexReplicas = 1; + int replicasConcurrentRecoveries = primaries * newIndexReplicas + randomIntBetween(1, 10); + verifyNewIndexReplicaAllocation(primaries, newIndexReplicas, primaries * newIndexReplicas, 0, + replicasConcurrentRecoveries, false); + } + + private void verifyNewIndexReplicaAllocation(int newIndexPrimaries, int newIndexReplicas, + int newIndexInitializingAfterReroute, int newIndexUnassignedAfterReroute, + int replicaConcurrentRecoveriesLimit, + boolean additionalNodeAfterPrimaryAssignment) { + TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); + Settings settings = Settings.builder().put("cluster.routing.allocation.node_concurrent_recoveries", 1) + .put("cluster.routing.allocation.node_initial_primaries_recoveries", 20) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), + replicaConcurrentRecoveriesLimit) + .build(); + AllocationService strategy = createAllocationService(settings, gatewayAllocator); + + DiscoveryNode node1 = newNode("node1"); + DiscoveryNode node2 = newNode("node2"); + + ClusterState clusterState = createPrimaryAndWaitForAllocation(strategy, node1, node2, settings); + + String[] indices = {"test1", "test2"}; + clusterState = increaseReplicaCountAndTriggerReroute(strategy, clusterState, indices, 1); + + logger.info("1 replica should be initializing now for the existing indices (we throttle to 1) on each node"); + assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(2)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node1"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node2"), equalTo(0)); + + + logger.info("create a new index"); + clusterState = createNewIndexAndStartAllPrimaries(newIndexPrimaries, newIndexReplicas, strategy, clusterState); + + if (additionalNodeAfterPrimaryAssignment) { + clusterState = strategy.reroute(clusterState, "reroute"); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build(); + } + clusterState = strategy.reroute(clusterState, "reroute"); + + IndexRoutingTable indexRouting = clusterState.routingTable().index("new_index"); + assertThat(indexRouting.shardsWithState(UNASSIGNED).size(), equalTo(newIndexUnassignedAfterReroute)); + assertThat(indexRouting.shardsWithState(INITIALIZING).size(), equalTo(newIndexInitializingAfterReroute)); + + int totalIncomingRecoveriesNewIndex = clusterState.getRoutingNodes().getInitialIncomingRecoveries("node1") + + clusterState.getRoutingNodes().getInitialIncomingRecoveries("node2") + + clusterState.getRoutingNodes().getInitialIncomingRecoveries("node3"); + + assertThat(totalIncomingRecoveriesNewIndex, equalTo(newIndexInitializingAfterReroute)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), equalTo(0)); + + clusterState = strategy.reroute(clusterState, "reroute"); + indexRouting = clusterState.routingTable().index("new_index"); + + assertThat(indexRouting.shardsWithState(UNASSIGNED).size(), equalTo(newIndexUnassignedAfterReroute)); + assertThat(indexRouting.shardsWithState(INITIALIZING).size(), equalTo(newIndexInitializingAfterReroute)); + + } + + public void testRecoveryCounts() { + TestGatewayAllocator gatewayAllocator = new TestGatewayAllocator(); + Settings settings = Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 2) + .build(); + AllocationService strategy = createAllocationService(settings, gatewayAllocator); + + Metadata metadata = Metadata.builder().persistentSettings(settings).build(); + + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metadata).nodes(DiscoveryNodes.builder() + .add(newNode("node1")).add(newNode("node2"))).build(); + + logger.info("create a new index and start all primaries"); + clusterState = createNewIndexAndStartAllPrimaries(2, 2, strategy, clusterState); + + logger.info("Add a new node for all replica assignment. 4 replicas move to INIT after reroute."); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node3"))).build(); + + clusterState = strategy.reroute(clusterState, "reroute"); + + assertThat(clusterState.routingTable().index("new_index").shardsWithState(UNASSIGNED).size(), equalTo(0)); + assertThat(clusterState.routingTable().index("new_index").shardsWithState(INITIALIZING).size(), equalTo(4)); + + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), equalTo(0)); + + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node1"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node2"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node3"), equalTo(2)); + + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), equalTo(0)); + + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node1"), equalTo(2)); + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node2"), equalTo(2)); + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node3"), equalTo(0)); + + logger.info("Exclude node1 and add node4. After this, primary shard on node1 moves to RELOCATING state and only " + + "non initial replica recoveries are impacted"); + settings = Settings.builder() + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_RECOVERIES_SETTING.getKey(), 1) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_PRIMARIES_RECOVERIES_SETTING.getKey(), 20) + .put(ThrottlingAllocationDecider.CLUSTER_ROUTING_ALLOCATION_NODE_INITIAL_REPLICAS_RECOVERIES_SETTING.getKey(), 2) + .put("cluster.routing.allocation.exclude._id", "node1") + .build(); + clusterState = ClusterState.builder(clusterState) + .nodes(DiscoveryNodes.builder(clusterState.nodes()).add(newNode("node4"))).build(); + strategy = createAllocationService(settings, gatewayAllocator); + + clusterState = strategy.reroute(clusterState, "reroute"); + assertThat(clusterState.routingTable().index("new_index").shardsWithState(UNASSIGNED).size(), equalTo(0)); + assertThat(clusterState.routingTable().index("new_index").shardsWithState(INITIALIZING).size(), equalTo(5)); + assertThat(clusterState.routingTable().index("new_index").shardsWithState(RELOCATING).size(), equalTo(1)); + + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node4"), equalTo(1)); + + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node1"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node2"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node3"), equalTo(2)); + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node4"), equalTo(0)); + + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node4"), equalTo(0)); + + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node1"), equalTo(2)); + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node2"), equalTo(2)); + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node3"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node4"), equalTo(0)); + + logger.info("Start primary on node4. Now all replicas for that primary start getting accounted for from node4."); + + clusterState = strategy.applyStartedShards(clusterState, clusterState.routingTable().index("new_index"). + shardsWithState(INITIALIZING).stream().filter(routing->routing.primary() && routing.isRelocationTarget()). + collect(Collectors.toList())); + assertThat(clusterState.routingTable().index("new_index").shardsWithState(UNASSIGNED).size(), equalTo(0)); + assertThat(clusterState.routingTable().index("new_index").shardsWithState(INITIALIZING).size(), equalTo(4)); + assertThat(clusterState.routingTable().index("new_index").shardsWithState(RELOCATING).size(), equalTo(0)); + + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getIncomingRecoveries("node4"), equalTo(0)); + + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node1"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node2"), equalTo(1)); + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node3"), equalTo(2)); + assertThat(clusterState.getRoutingNodes().getInitialIncomingRecoveries("node4"), equalTo(0)); + + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getOutgoingRecoveries("node4"), equalTo(0)); + + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node1"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node2"), equalTo(2)); + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node3"), equalTo(0)); + assertThat(clusterState.getRoutingNodes().getInitialOutgoingRecoveries("node4"), equalTo(2)); + } + + private ClusterState createPrimaryAndWaitForAllocation(AllocationService strategy, DiscoveryNode node1, + DiscoveryNode node2, Settings settings) { + Metadata metaData = Metadata.builder() + .put(IndexMetadata.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(0)) + .put(IndexMetadata.builder("test2").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(0)) + .persistentSettings(settings) + .build(); + + RoutingTable initialRoutingTable = RoutingTable.builder() + .addAsNew(metaData.index("test1")) + .addAsNew(metaData.index("test2")) + .build(); + + ClusterState clusterState = ClusterState.builder(CLUSTER_NAME_SETTING + .getDefault(Settings.EMPTY)).metadata(metaData).routingTable(initialRoutingTable).build(); + + logger.info("adding two nodes and performing rerouting till all are allocated"); + clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() + .add(node1).add(node2)).build(); + clusterState = strategy.reroute(clusterState, "reroute"); + + while (!clusterState.getRoutingNodes().shardsWithState(INITIALIZING).isEmpty()) { + clusterState = strategy.applyStartedShards(clusterState, clusterState.routingTable().shardsWithState(INITIALIZING)); + } + return clusterState; + } + + private ClusterState createNewIndexAndStartAllPrimaries(int newIndexPrimaries, int newIndexReplicas, AllocationService strategy, + ClusterState clusterState) { + Metadata metadata = Metadata.builder(clusterState.metadata()) + .put(IndexMetadata.builder("new_index").settings(settings(Version.CURRENT)) + .numberOfShards(newIndexPrimaries).numberOfReplicas(newIndexReplicas)) + .build(); + + RoutingTable updatedRoutingTable = RoutingTable.builder(clusterState.routingTable()) + .addAsNew(metadata.index("new_index")) + .build(); + + clusterState = ClusterState.builder(clusterState).metadata(metadata).routingTable(updatedRoutingTable).build(); + + logger.info("reroute, verify that primaries for the new index primary shards are allocated"); + clusterState = strategy.reroute(clusterState, "reroute"); + + logger.info("Apply started shards for new index"); + clusterState = strategy.applyStartedShards(clusterState, clusterState.routingTable().index("new_index"). + shardsWithState(INITIALIZING)); + return clusterState; + } + + private ClusterState increaseReplicaCountAndTriggerReroute(AllocationService strategy, ClusterState clusterState, + String[] indices, int replicaCount) { + Metadata metaData; + logger.info("increasing the number of replicas to 1, and perform a reroute (to get the replicas allocation going)"); + RoutingTable updatedRoutingTable = + RoutingTable.builder(clusterState.routingTable()).updateNumberOfReplicas(replicaCount, indices).build(); + metaData = Metadata.builder(clusterState.metadata()).updateNumberOfReplicas(replicaCount, indices).build(); + clusterState = ClusterState.builder(clusterState).routingTable(updatedRoutingTable).metadata(metaData).build(); + + clusterState = strategy.reroute(clusterState, "reroute"); + return clusterState; + } + private ClusterState createRecoveryStateAndInitializeAllocations( final Metadata metadata, final TestGatewayAllocator gatewayAllocator, final TestSnapshotsInfoService snapshotsInfoService + ) { + return createRecoveryStateAndInitializeAllocations(metadata, gatewayAllocator, snapshotsInfoService, null); + + } + + private ClusterState createRecoveryStateAndInitializeAllocations( + final Metadata metadata, + final TestGatewayAllocator gatewayAllocator, + final TestSnapshotsInfoService snapshotsInfoService, final Integer inputRecoveryType ) { DiscoveryNode node1 = newNode("node1"); Metadata.Builder metadataBuilder = new Metadata.Builder(metadata); @@ -366,7 +720,8 @@ public class ThrottlingAllocationTests extends OpenSearchAllocationTestCase { for (ObjectCursor cursor: metadata.indices().values()) { Index index = cursor.value.getIndex(); IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(cursor.value); - final int recoveryType = randomInt(5); + + final int recoveryType = inputRecoveryType == null? randomInt(5): inputRecoveryType.intValue(); if (recoveryType <= 4) { addInSyncAllocationIds(index, indexMetadataBuilder, gatewayAllocator, node1); }