Make allocation decisions at node level first for pending task optimi… (#534) (#739)

* Make allocation decisions at node level first for pending task optimization

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Addressing review comments

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Fixing benchmark and adding debug mode tests

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Fixing typo in previous commit

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Moving test file to correct package

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Addressing review comments

Signed-off-by: Ankit Jain <akjain@amazon.com>
This commit is contained in:
Ankit Jain 2021-05-21 02:32:18 +05:30 committed by GitHub
parent e90bde5a05
commit edbe5ae7e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 685 additions and 80 deletions

View File

@ -31,16 +31,6 @@
package org.opensearch.benchmark.routing.allocation; package org.opensearch.benchmark.routing.allocation;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.settings.Settings;
import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Fork;
@ -52,8 +42,20 @@ import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State; import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.annotations.Warmup;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.settings.Settings;
import java.util.Collections; import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Fork(3) @Fork(3)
@ -71,75 +73,103 @@ public class AllocationBenchmark {
// support to constrain the combinations of benchmark parameters and we do not want to rely on OptionsBuilder as each benchmark would // support to constrain the combinations of benchmark parameters and we do not want to rely on OptionsBuilder as each benchmark would
// need its own main method and we cannot execute more than one class with a main method per JAR. // need its own main method and we cannot execute more than one class with a main method per JAR.
@Param({ @Param({
// indices| shards| replicas| nodes // indices| shards| replicas| source| target| concurrentRecoveries
" 10| 1| 0| 1", " 10| 2| 0| 1| 1| 1|",
" 10| 3| 0| 1", " 10| 3| 0| 1| 1| 2|",
" 10| 10| 0| 1", " 10| 10| 0| 1| 1| 5|",
" 100| 1| 0| 1", " 100| 1| 0| 1| 1| 10|",
" 100| 3| 0| 1", " 100| 3| 0| 1| 1| 10|",
" 100| 10| 0| 1", " 100| 10| 0| 1| 1| 10|",
" 10| 1| 0| 10", " 10| 2| 0| 10| 10| 1|",
" 10| 3| 0| 10", " 10| 3| 0| 10| 5| 2|",
" 10| 10| 0| 10", " 10| 10| 0| 10| 5| 5|",
" 100| 1| 0| 10", " 100| 1| 0| 5| 10| 5|",
" 100| 3| 0| 10", " 100| 3| 0| 10| 5| 5|",
" 100| 10| 0| 10", " 100| 10| 0| 10| 20| 6|",
" 10| 1| 1| 10", " 10| 1| 1| 10| 10| 1|",
" 10| 3| 1| 10", " 10| 3| 1| 10| 3| 3|",
" 10| 10| 1| 10", " 10| 10| 1| 5| 12| 5|",
" 100| 1| 1| 10", " 100| 1| 1| 10| 10| 6|",
" 100| 3| 1| 10", " 100| 3| 1| 10| 5| 8|",
" 100| 10| 1| 10", " 100| 10| 1| 8| 17| 8|",
" 10| 1| 2| 10", " 10| 1| 2| 10| 10| 1|",
" 10| 3| 2| 10", " 10| 3| 2| 10| 5| 3|",
" 10| 10| 2| 10", " 10| 10| 2| 5| 10| 5|",
" 100| 1| 2| 10", " 100| 1| 2| 10| 8| 7|",
" 100| 3| 2| 10", " 100| 3| 2| 13| 17| 5|",
" 100| 10| 2| 10", " 100| 10| 2| 10| 20| 8|",
" 10| 1| 0| 50", " 10| 2| 1| 20| 20| 1|",
" 10| 3| 0| 50", " 10| 3| 1| 20| 30| 1|",
" 10| 10| 0| 50", " 10| 10| 1| 20| 10| 3|",
" 100| 1| 0| 50", " 100| 1| 1| 20| 5| 5|",
" 100| 3| 0| 50", " 100| 3| 1| 20| 23| 6|",
" 100| 10| 0| 50", " 100| 10| 1| 40| 20| 8|",
" 10| 1| 1| 50", " 10| 3| 2| 50| 30| 1|",
" 10| 3| 1| 50", " 10| 3| 2| 50| 25| 1|",
" 10| 10| 1| 50", " 10| 10| 1| 50| 33| 2|",
" 100| 1| 1| 50", " 100| 1| 1| 40| 50| 2|",
" 100| 3| 1| 50", " 100| 3| 1| 50| 70| 3|",
" 100| 10| 1| 50", " 100| 10| 1| 60| 50| 3|",
" 10| 1| 2| 50", " 10| 10| 2| 50| 50| 1|",
" 10| 3| 2| 50", " 10| 3| 2| 50| 30| 1|",
" 10| 10| 2| 50", " 10| 10| 2| 50| 40| 2|",
" 100| 1| 2| 50", " 100| 1| 2| 40| 50| 2|",
" 100| 3| 2| 50", " 100| 3| 2| 50| 30| 6|",
" 100| 10| 2| 50" }) " 100| 10| 2| 33| 55| 6|",
public String indicesShardsReplicasNodes = "10|1|0|1";
" 500| 60| 1| 100| 100| 12|",
" 500| 60| 1| 100| 40| 12|",
" 500| 60| 1| 40| 100| 12|",
" 50| 60| 1| 100| 100| 6|",
" 50| 60| 1| 100| 40| 6|",
" 50| 60| 1| 40| 100| 6|" })
public String indicesShardsReplicasSourceTargetRecoveries = "10|1|0|1|1|1";
public int numTags = 2; public int numTags = 2;
public int numZone = 3;
public int concurrentRecoveries;
public int numIndices;
public int numShards;
public int numReplicas;
public int sourceNodes;
public int targetNodes;
public int clusterConcurrentRecoveries;
private AllocationService strategy; private AllocationService initialClusterStrategy;
private AllocationService clusterExcludeStrategy;
private AllocationService clusterZoneAwareExcludeStrategy;
private ClusterState initialClusterState; private ClusterState initialClusterState;
@Setup @Setup
public void setUp() throws Exception { public void setUp() throws Exception {
final String[] params = indicesShardsReplicasNodes.split("\\|"); final String[] params = indicesShardsReplicasSourceTargetRecoveries.split("\\|");
numIndices = toInt(params[0]);
numShards = toInt(params[1]);
numReplicas = toInt(params[2]);
sourceNodes = toInt(params[3]);
targetNodes = toInt(params[4]);
concurrentRecoveries = toInt(params[5]);
int numIndices = toInt(params[0]); int totalShardCount = (numReplicas + 1) * numShards * numIndices;
int numShards = toInt(params[1]);
int numReplicas = toInt(params[2]);
int numNodes = toInt(params[3]);
strategy = Allocators.createAllocationService( initialClusterStrategy = Allocators.createAllocationService(
Settings.builder().put("cluster.routing.allocation.awareness.attributes", "tag").build() Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", "20")
.put("cluster.routing.allocation.exclude.tag", "tag_0")
.build()
); );
// We'll try to move nodes from tag_1 to tag_0
clusterConcurrentRecoveries = Math.min(sourceNodes, targetNodes) * concurrentRecoveries;
Metadata.Builder mb = Metadata.builder(); Metadata.Builder mb = Metadata.builder();
for (int i = 1; i <= numIndices; i++) { for (int i = 1; i <= numIndices; i++) {
mb.put( mb.put(
@ -155,15 +185,37 @@ public class AllocationBenchmark {
rb.addAsNew(metadata.index("test_" + i)); rb.addAsNew(metadata.index("test_" + i));
} }
RoutingTable routingTable = rb.build(); RoutingTable routingTable = rb.build();
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= numNodes; i++) {
nb.add(Allocators.newNode("node" + i, Collections.singletonMap("tag", "tag_" + (i % numTags))));
}
initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) initialClusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY))
.metadata(metadata) .metadata(metadata)
.routingTable(routingTable) .routingTable(routingTable)
.nodes(nb) .nodes(setUpClusterNodes(sourceNodes, targetNodes))
.build(); .build();
// Start all unassigned shards
initialClusterState = initialClusterStrategy.reroute(initialClusterState, "reroute");
while (initialClusterState.getRoutingNodes().hasUnassignedShards()) {
initialClusterState = initialClusterStrategy.applyStartedShards(
initialClusterState,
initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
initialClusterState = initialClusterStrategy.reroute(initialClusterState, "reroute");
}
// Ensure all shards are started
while (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
initialClusterState = initialClusterStrategy.applyStartedShards(
initialClusterState,
initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
);
}
assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size() == totalShardCount);
assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() == 0);
assert (initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size() == 0);
// make sure shards are only allocated on tag1
for (ShardRouting startedShard : initialClusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
assert (initialClusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag")).equals(
"tag_1"
);
}
} }
private int toInt(String v) { private int toInt(String v) {
@ -171,15 +223,58 @@ public class AllocationBenchmark {
} }
@Benchmark @Benchmark
public ClusterState measureAllocation() { public ClusterState measureExclusionOnZoneAwareStartedShard() throws Exception {
ClusterState clusterState = initialClusterState; ClusterState clusterState = initialClusterState;
while (clusterState.getRoutingNodes().hasUnassignedShards()) { clusterZoneAwareExcludeStrategy = Allocators.createAllocationService(
clusterState = strategy.applyStartedShards( Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries))
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(concurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1")
.build()
);
clusterState = clusterZoneAwareExcludeStrategy.reroute(clusterState, "reroute");
return clusterState;
}
@Benchmark
public ClusterState measureShardRelocationComplete() throws Exception {
ClusterState clusterState = initialClusterState;
clusterZoneAwareExcludeStrategy = Allocators.createAllocationService(
Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(concurrentRecoveries))
.put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1")
.build()
);
clusterState = clusterZoneAwareExcludeStrategy.reroute(clusterState, "reroute");
while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
clusterState = clusterZoneAwareExcludeStrategy.applyStartedShards(
clusterState, clusterState,
clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING) clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING)
); );
clusterState = strategy.reroute(clusterState, "reroute"); }
for (ShardRouting startedShard : clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
assert (clusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag")).equals("tag_0");
} }
return clusterState; return clusterState;
} }
private DiscoveryNodes.Builder setUpClusterNodes(int sourceNodes, int targetNodes) {
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= sourceNodes; i++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("tag", "tag_" + 1);
attributes.put("zone", "zone_" + (i % numZone));
nb.add(Allocators.newNode("node_s_" + i, attributes));
}
for (int j = 1; j <= targetNodes; j++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("tag", "tag_" + 0);
attributes.put("zone", "zone_" + (j % numZone));
nb.add(Allocators.newNode("node_t_" + j, attributes));
}
return nb;
}
} }

View File

@ -57,6 +57,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders; import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ConcurrentRecoveriesAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
@ -244,6 +245,7 @@ public class ClusterModule extends AbstractModule {
addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider()); addAllocationDecider(deciders, new RebalanceOnlyWhenActiveAllocationDecider());
addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ClusterRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new ConcurrentRebalanceAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new ConcurrentRecoveriesAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings)); addAllocationDecider(deciders, new EnableAllocationDecider(settings, clusterSettings));
addAllocationDecider(deciders, new NodeVersionAllocationDecider()); addAllocationDecider(deciders, new NodeVersionAllocationDecider());
addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider()); addAllocationDecider(deciders, new SnapshotInProgressAllocationDecider());

View File

@ -256,6 +256,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
private final Metadata metadata; private final Metadata metadata;
private final float avgShardsPerNode; private final float avgShardsPerNode;
private final NodeSorter sorter; private final NodeSorter sorter;
private final Set<RoutingNode> inEligibleTargetNode;
public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) { public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) {
this.logger = logger; this.logger = logger;
@ -267,6 +268,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size(); avgShardsPerNode = ((float) metadata.getTotalNumberOfShards()) / routingNodes.size();
nodes = Collections.unmodifiableMap(buildModelFromAssigned()); nodes = Collections.unmodifiableMap(buildModelFromAssigned());
sorter = newNodeSorter(); sorter = newNodeSorter();
inEligibleTargetNode = new HashSet<>();
} }
/** /**
@ -632,6 +634,16 @@ public class BalancedShardsAllocator implements ShardsAllocator {
return indices; return indices;
} }
/**
* Checks if target node is ineligible and if so, adds to the list
* of ineligible target nodes
*/
private void checkAndAddInEligibleTargetNode(RoutingNode targetNode) {
Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(targetNode, allocation);
if (nodeLevelAllocationDecision.type() != Decision.Type.YES) {
inEligibleTargetNode.add(targetNode);
}
}
/** /**
* Move started shards that can not be allocated to a node anymore * Move started shards that can not be allocated to a node anymore
* *
@ -646,8 +658,37 @@ public class BalancedShardsAllocator implements ShardsAllocator {
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards. // offloading the shards.
// Trying to eliminate target nodes so that we donot unnecessarily iterate over source nodes
// when no target is eligible
for (ModelNode currentNode : sorter.modelNodes) {
checkAndAddInEligibleTargetNode(currentNode.getRoutingNode());
}
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) { for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) {
//Verify if the cluster concurrent recoveries have been reached.
if (allocation.deciders().canMoveAnyShard(allocation).type() != Decision.Type.YES) {
logger.info("Cannot move any shard in the cluster due to cluster concurrent recoveries getting breached"
+ ". Skipping shard iteration");
return;
}
//Early terminate node interleaved shard iteration when no eligible target nodes are available
if(sorter.modelNodes.length == inEligibleTargetNode.size()) {
logger.info("Cannot move any shard in the cluster as there is no node on which shards can be allocated"
+ ". Skipping shard iteration");
return;
}
ShardRouting shardRouting = it.next(); ShardRouting shardRouting = it.next();
// Verify if the shard is allowed to move if outgoing recovery on the node hosting the primary shard
// is not being throttled.
Decision canMoveAwayDecision = allocation.deciders().canMoveAway(shardRouting, allocation);
if(canMoveAwayDecision.type() != Decision.Type.YES) {
if (logger.isDebugEnabled())
logger.debug("Cannot move away shard [{}] Skipping this shard", shardRouting);
continue;
}
final MoveDecision moveDecision = decideMove(shardRouting); final MoveDecision moveDecision = decideMove(shardRouting);
if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) { if (moveDecision.isDecisionTaken() && moveDecision.forceMove()) {
final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId()); final ModelNode sourceNode = nodes.get(shardRouting.currentNodeId());
@ -660,6 +701,11 @@ public class BalancedShardsAllocator implements ShardsAllocator {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode()); logger.trace("Moved shard [{}] to node [{}]", shardRouting, targetNode.getRoutingNode());
} }
// Verifying if this node can be considered ineligible for further iterations
if (targetNode != null) {
checkAndAddInEligibleTargetNode(targetNode.getRoutingNode());
}
} else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) { } else if (moveDecision.isDecisionTaken() && moveDecision.canRemain() == false) {
logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id()); logger.trace("[{}][{}] can't move", shardRouting.index(), shardRouting.id());
} }
@ -704,9 +750,22 @@ public class BalancedShardsAllocator implements ShardsAllocator {
RoutingNode targetNode = null; RoutingNode targetNode = null;
final List<NodeAllocationResult> nodeExplanationMap = explain ? new ArrayList<>() : null; final List<NodeAllocationResult> nodeExplanationMap = explain ? new ArrayList<>() : null;
int weightRanking = 0; int weightRanking = 0;
int targetNodeProcessed = 0;
for (ModelNode currentNode : sorter.modelNodes) { for (ModelNode currentNode : sorter.modelNodes) {
if (currentNode != sourceNode) { if (currentNode != sourceNode) {
RoutingNode target = currentNode.getRoutingNode(); RoutingNode target = currentNode.getRoutingNode();
if(!explain && inEligibleTargetNode.contains(target))
continue;
// don't use canRebalance as we want hard filtering rules to apply. See #17698
if (!explain) {
// If we cannot allocate any shard to node marking it in eligible
Decision nodeLevelAllocationDecision = allocation.deciders().canAllocateAnyShardToNode(target, allocation);
if (nodeLevelAllocationDecision.type() != Decision.Type.YES) {
inEligibleTargetNode.add(currentNode.getRoutingNode());
continue;
}
}
targetNodeProcessed++;
// don't use canRebalance as we want hard filtering rules to apply. See #17698 // don't use canRebalance as we want hard filtering rules to apply. See #17698
Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation); Decision allocationDecision = allocation.deciders().canAllocate(shardRouting, target, allocation);
if (explain) { if (explain) {

View File

@ -128,4 +128,32 @@ public abstract class AllocationDecider {
return decision; return decision;
} }
} }
/**
* Returns a {@link Decision} whether the given shard can be moved away from the current node
* {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}.
*/
public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
/**
* Returns a {@link Decision} whether any shard in the cluster can be moved away from the current node
* {@link RoutingAllocation}. The default is {@link Decision#ALWAYS}.
*/
public Decision canMoveAnyShard(RoutingAllocation allocation) {
return Decision.ALWAYS;
}
/**
* Returns a {@link Decision} whether any shard on the given
* {@link RoutingNode}} can be allocated The default is {@link Decision#ALWAYS}.
* All implementations that override this behaviour must take a
* {@link Decision}} whether or not to skip iterating over the remaining
* deciders for this node.
*/
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
return Decision.ALWAYS;
}
} }

View File

@ -64,7 +64,7 @@ public class AllocationDeciders extends AllocationDecider {
Decision decision = allocationDecider.canRebalance(shardRouting, allocation); Decision decision = allocationDecider.canRebalance(shardRouting, allocation);
// short track if a NO is returned. // short track if a NO is returned.
if (decision == Decision.NO) { if (decision == Decision.NO) {
if (!allocation.debugDecision()) { if (allocation.debugDecision() == false) {
return decision; return decision;
} else { } else {
ret.add(decision); ret.add(decision);
@ -91,7 +91,7 @@ public class AllocationDeciders extends AllocationDecider {
shardRouting, node.node(), allocationDecider.getClass().getSimpleName()); shardRouting, node.node(), allocationDecider.getClass().getSimpleName());
} }
// short circuit only if debugging is not enabled // short circuit only if debugging is not enabled
if (!allocation.debugDecision()) { if (allocation.debugDecision() == false) {
return decision; return decision;
} else { } else {
ret.add(decision); ret.add(decision);
@ -120,7 +120,7 @@ public class AllocationDeciders extends AllocationDecider {
logger.trace("Shard [{}] can not remain on node [{}] due to [{}]", logger.trace("Shard [{}] can not remain on node [{}] due to [{}]",
shardRouting, node.nodeId(), allocationDecider.getClass().getSimpleName()); shardRouting, node.nodeId(), allocationDecider.getClass().getSimpleName());
} }
if (!allocation.debugDecision()) { if (allocation.debugDecision() == false) {
return decision; return decision;
} else { } else {
ret.add(decision); ret.add(decision);
@ -139,7 +139,7 @@ public class AllocationDeciders extends AllocationDecider {
Decision decision = allocationDecider.canAllocate(indexMetadata, node, allocation); Decision decision = allocationDecider.canAllocate(indexMetadata, node, allocation);
// short track if a NO is returned. // short track if a NO is returned.
if (decision == Decision.NO) { if (decision == Decision.NO) {
if (!allocation.debugDecision()) { if (allocation.debugDecision() == false) {
return decision; return decision;
} else { } else {
ret.add(decision); ret.add(decision);
@ -158,7 +158,7 @@ public class AllocationDeciders extends AllocationDecider {
Decision decision = allocationDecider.shouldAutoExpandToNode(indexMetadata, node, allocation); Decision decision = allocationDecider.shouldAutoExpandToNode(indexMetadata, node, allocation);
// short track if a NO is returned. // short track if a NO is returned.
if (decision == Decision.NO) { if (decision == Decision.NO) {
if (!allocation.debugDecision()) { if (allocation.debugDecision() == false) {
return decision; return decision;
} else { } else {
ret.add(decision); ret.add(decision);
@ -177,7 +177,7 @@ public class AllocationDeciders extends AllocationDecider {
Decision decision = allocationDecider.canAllocate(shardRouting, allocation); Decision decision = allocationDecider.canAllocate(shardRouting, allocation);
// short track if a NO is returned. // short track if a NO is returned.
if (decision == Decision.NO) { if (decision == Decision.NO) {
if (!allocation.debugDecision()) { if (allocation.debugDecision() == false) {
return decision; return decision;
} else { } else {
ret.add(decision); ret.add(decision);
@ -196,7 +196,7 @@ public class AllocationDeciders extends AllocationDecider {
Decision decision = allocationDecider.canRebalance(allocation); Decision decision = allocationDecider.canRebalance(allocation);
// short track if a NO is returned. // short track if a NO is returned.
if (decision == Decision.NO) { if (decision == Decision.NO) {
if (!allocation.debugDecision()) { if (allocation.debugDecision() == false) {
return decision; return decision;
} else { } else {
ret.add(decision); ret.add(decision);
@ -224,7 +224,70 @@ public class AllocationDeciders extends AllocationDecider {
logger.trace("Shard [{}] can not be forcefully allocated to node [{}] due to [{}].", logger.trace("Shard [{}] can not be forcefully allocated to node [{}] due to [{}].",
shardRouting.shardId(), node.nodeId(), decider.getClass().getSimpleName()); shardRouting.shardId(), node.nodeId(), decider.getClass().getSimpleName());
} }
if (!allocation.debugDecision()) { if (allocation.debugDecision() == false) {
return decision;
} else {
ret.add(decision);
}
} else {
addDecision(ret, decision, allocation);
}
}
return ret;
}
@Override
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider decider : allocations) {
Decision decision = decider.canAllocateAnyShardToNode(node, allocation);
if (decision.type().canPremptivelyReturn()) {
if (logger.isTraceEnabled()) {
logger.trace("Shard can not be allocated on node [{}] due to [{}]", node.nodeId(), decider.getClass().getSimpleName());
}
if (allocation.debugDecision() == false) {
return decision;
} else {
ret.add(decision);
}
} else {
addDecision(ret, decision, allocation);
}
}
return ret;
}
@Override
public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider decider : allocations) {
Decision decision = decider.canMoveAway(shardRouting, allocation);
// short track if a NO is returned.
if (decision.type().canPremptivelyReturn()) {
if (logger.isTraceEnabled()) {
logger.trace("Shard [{}] can not be moved away due to [{}]", shardRouting, decider.getClass().getSimpleName());
}
if (allocation.debugDecision() == false) {
return decision;
} else {
ret.add(decision);
}
} else {
addDecision(ret, decision, allocation);
}
}
return ret;
}
@Override
public Decision canMoveAnyShard(RoutingAllocation allocation) {
Decision.Multi ret = new Decision.Multi();
for (AllocationDecider decider : allocations) {
Decision decision = decider.canMoveAnyShard(allocation);
// short track if a NO is returned.
if (decision.type().canPremptivelyReturn()) {
if (allocation.debugDecision() == false) {
return decision; return decision;
} else { } else {
ret.add(decision); ret.add(decision);

View File

@ -0,0 +1,82 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.cluster.routing.allocation.decider;
import org.opensearch.cluster.routing.RoutingNode;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
* This {@link AllocationDecider} controls the number of currently in-progress
* re-balance (relocation) operations and restricts node allocations if the
* configured threshold is reached.
* <p>
* Re-balance operations can be controlled in real-time via the cluster update API using
* <code>cluster.routing.allocation.cluster_concurrent_recoveries</code>. Iff this
* setting is set to <code>-1</code> the number of cluster concurrent recoveries operations
* are unlimited.
*/
public class ConcurrentRecoveriesAllocationDecider extends AllocationDecider {
private static final Logger logger = LogManager.getLogger(ConcurrentRecoveriesAllocationDecider.class);
public static final String NAME = "cluster_concurrent_recoveries";
public static final Setting<Integer> CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING =
Setting.intSetting("cluster.routing.allocation.cluster_concurrent_recoveries", -1, -1,
Property.Dynamic, Property.NodeScope);
private volatile int clusterConcurrentRecoveries;
public ConcurrentRecoveriesAllocationDecider(Settings settings, ClusterSettings clusterSettings) {
this.clusterConcurrentRecoveries = CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING.get(settings);
logger.debug("using [cluster_concurrent_rebalance] with [{}]", clusterConcurrentRecoveries);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING,
this::setClusterConcurrentRebalance);
}
private void setClusterConcurrentRebalance(int clusterConcurrentRecoveries) {
this.clusterConcurrentRecoveries = clusterConcurrentRecoveries;
}
@Override
public Decision canMoveAnyShard(RoutingAllocation allocation) {
if (clusterConcurrentRecoveries == -1) {
return allocation.decision(Decision.YES, NAME, "undefined cluster concurrent recoveries");
}
int relocatingShards = allocation.routingNodes().getRelocatingShardCount();
if (relocatingShards >= clusterConcurrentRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME,
"too many shards are concurrently relocating [%d], limit: [%d] cluster setting [%s=%d]",
relocatingShards, clusterConcurrentRecoveries, CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING.getKey(),
clusterConcurrentRecoveries);
}
return allocation.decision(Decision.YES, NAME,
"below threshold [%d] for concurrent recoveries, current relocating shard count [%d]",
clusterConcurrentRecoveries, relocatingShards);
}
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return canMoveAnyShard(allocation);
}
}

View File

@ -137,6 +137,10 @@ public abstract class Decision implements ToXContent, Writeable {
return false; return false;
} }
public boolean canPremptivelyReturn() {
return this == THROTTLE || this == NO;
}
} }
/** /**

View File

@ -147,6 +147,12 @@ public class FilterAllocationDecider extends AllocationDecider {
return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters"); return allocation.decision(Decision.YES, NAME, "node passes include/exclude/require filters");
} }
@Override
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node.node(), allocation);
return decision != null && decision == Decision.NO ? decision : Decision.ALWAYS;
}
private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) { private Decision shouldFilter(ShardRouting shardRouting, DiscoveryNode node, RoutingAllocation allocation) {
Decision decision = shouldClusterFilter(node, allocation); Decision decision = shouldClusterFilter(node, allocation);
if (decision != null) return decision; if (decision != null) return decision;

View File

@ -224,4 +224,44 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
assert initializingShard.initializing(); assert initializingShard.initializing();
return initializingShard; return initializingShard;
} }
@Override
public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) {
int outgoingRecoveries = 0;
if (!shardRouting.primary()) {
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId());
outgoingRecoveries = allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId());
} else {
outgoingRecoveries = allocation.routingNodes().getOutgoingRecoveries(shardRouting.currentNodeId());
}
if (outgoingRecoveries >= concurrentOutgoingRecoveries) {
return allocation.decision(
THROTTLE, NAME,
"too many outgoing shards are currently recovering [%d], limit: [%d] cluster setting [%s=%d]",
outgoingRecoveries, concurrentOutgoingRecoveries,
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_OUTGOING_RECOVERIES_SETTING.getKey(),
concurrentOutgoingRecoveries
);
} else {
return allocation.decision(YES, NAME, "below shard recovery limit of outgoing: [%d < %d]", outgoingRecoveries,
concurrentOutgoingRecoveries);
}
}
@Override
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
int incomingRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
if (incomingRecoveries >= concurrentIncomingRecoveries) {
return allocation.decision(
THROTTLE, NAME,
"too many incoming shards are currently recovering [%d], limit: [%d] cluster setting [%s=%d]",
incomingRecoveries, concurrentIncomingRecoveries,
CLUSTER_ROUTING_ALLOCATION_NODE_CONCURRENT_INCOMING_RECOVERIES_SETTING.getKey(),
concurrentIncomingRecoveries
);
} else {
return allocation.decision(YES, NAME, "below shard recovery limit of incoming: [%d < %d]", incomingRecoveries,
concurrentIncomingRecoveries);
}
}
} }

View File

@ -67,6 +67,7 @@ import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocat
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ConcurrentRecoveriesAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
@ -226,6 +227,7 @@ public final class ClusterSettings extends AbstractScopedSettings {
BreakerSettings.CIRCUIT_BREAKER_TYPE, BreakerSettings.CIRCUIT_BREAKER_TYPE,
ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING, ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE_SETTING,
ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING, ConcurrentRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_REBALANCE_SETTING,
ConcurrentRecoveriesAllocationDecider.CLUSTER_ROUTING_ALLOCATION_CLUSTER_CONCURRENT_RECOVERIES_SETTING,
DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING, DanglingIndicesState.AUTO_IMPORT_DANGLING_INDICES_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING,
EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING, EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING,

View File

@ -43,6 +43,7 @@ import org.opensearch.cluster.routing.allocation.decider.AllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.ConcurrentRebalanceAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ConcurrentRecoveriesAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider; import org.opensearch.cluster.routing.allocation.decider.FilterAllocationDecider;
@ -219,6 +220,7 @@ public class ClusterModuleTests extends ModuleTestCase {
RebalanceOnlyWhenActiveAllocationDecider.class, RebalanceOnlyWhenActiveAllocationDecider.class,
ClusterRebalanceAllocationDecider.class, ClusterRebalanceAllocationDecider.class,
ConcurrentRebalanceAllocationDecider.class, ConcurrentRebalanceAllocationDecider.class,
ConcurrentRecoveriesAllocationDecider.class,
EnableAllocationDecider.class, EnableAllocationDecider.class,
NodeVersionAllocationDecider.class, NodeVersionAllocationDecider.class,
SnapshotInProgressAllocationDecider.class, SnapshotInProgressAllocationDecider.class,

View File

@ -103,6 +103,18 @@ public class AllocationDecidersTests extends OpenSearchTestCase {
public Decision canRebalance(RoutingAllocation allocation) { public Decision canRebalance(RoutingAllocation allocation) {
return Decision.YES; return Decision.YES;
} }
public Decision canMoveAway(ShardRouting shardRouting, RoutingAllocation allocation) {
return Decision.YES;
}
public Decision canMoveAnyShard(RoutingAllocation allocation) {
return Decision.YES;
}
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
return Decision.YES;
}
})); }));
ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build(); ClusterState clusterState = ClusterState.builder(new ClusterName("test")).build();
@ -125,6 +137,9 @@ public class AllocationDecidersTests extends OpenSearchTestCase {
verify(deciders.canRemain(shardRouting, routingNode, allocation), matcher); verify(deciders.canRemain(shardRouting, routingNode, allocation), matcher);
verify(deciders.canForceAllocatePrimary(shardRouting, routingNode, allocation), matcher); verify(deciders.canForceAllocatePrimary(shardRouting, routingNode, allocation), matcher);
verify(deciders.shouldAutoExpandToNode(idx, null, allocation), matcher); verify(deciders.shouldAutoExpandToNode(idx, null, allocation), matcher);
verify(deciders.canMoveAway(shardRouting, allocation), matcher);
verify(deciders.canMoveAnyShard(allocation), matcher);
verify(deciders.canAllocateAnyShardToNode(routingNode, allocation), matcher);
} }
private void verify(Decision decision, Matcher<Collection<? extends Decision>> matcher) { private void verify(Decision decision, Matcher<Collection<? extends Decision>> matcher) {

View File

@ -0,0 +1,207 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/
package org.opensearch.cluster.routing.allocation.decider;
import org.opensearch.Version;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.OpenSearchAllocationTestCase;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.settings.Settings;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
public class ConcurrentRecoveriesAllocationDeciderTests extends OpenSearchAllocationTestCase {
public void testClusterConcurrentRecoveries() {
int primaryShards = 5, replicaShards = 1, numberIndices = 12;
int clusterConcurrentRecoveries = -1;
int nodeConcurrentRecoveries = 4;
AllocationService initialStrategy = createAllocationService(
Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_initial_primaries_recoveries", "8")
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(nodeConcurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_0").build());
AllocationService excludeStrategy = null;
logger.info("Building initial routing table");
Metadata.Builder metadataBuilder = Metadata.builder();
for (int i = 0; i < numberIndices; i++) {
metadataBuilder.put(IndexMetadata.builder("test_" + i).settings(settings(Version.CURRENT)).numberOfShards(primaryShards)
.numberOfReplicas(replicaShards));
}
RoutingTable.Builder initialRoutingTableBuilder = RoutingTable.builder();
Metadata metadata = metadataBuilder.build();
for (int i = 0; i < numberIndices; i++) {
initialRoutingTableBuilder.addAsNew(metadata.index("test_" + i));
}
RoutingTable routingTable = initialRoutingTableBuilder.build();
logger.info("--> adding nodes and starting shards");
List<Tuple<Integer, Integer>> srcTargetNodes = Collections.unmodifiableList(Arrays.<Tuple<Integer, Integer>>asList(
new Tuple(10, 4),
new Tuple(4, 10),
new Tuple(10, 10))
);
for (Tuple<Integer, Integer> srcTargetNode : srcTargetNodes) {
int srcNodes = srcTargetNode.v1();
int targetNodes = srcTargetNode.v2();
logger.info("Setting up tests for src node {} and target node {}", srcNodes, targetNodes);
ClusterState clusterState = ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).metadata(metadata)
.routingTable(routingTable).nodes(setUpClusterNodes(srcNodes, targetNodes)).build();
clusterState = initialStrategy.reroute(clusterState, "reroute");
// Initialize shards
logger.info("--> Starting primary shards");
while (clusterState.getRoutingNodes().hasUnassignedShards()) {
clusterState = startInitializingShardsAndReroute(initialStrategy, clusterState);
}
logger.info("--> Starting replica shards");
while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
clusterState = startInitializingShardsAndReroute(initialStrategy, clusterState);
}
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED).size(),
equalTo((replicaShards + 1) * primaryShards * numberIndices));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size(), equalTo(0));
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(), equalTo(0));
clusterConcurrentRecoveries = Math.min(srcNodes, targetNodes) * nodeConcurrentRecoveries;
excludeStrategy = createAllocationService(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(nodeConcurrentRecoveries))
.put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1").build());
for (int counter = 0; counter < 3; counter++) {
logger.info("--> Performing a reroute ");
clusterState = excludeStrategy.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(),
equalTo(clusterConcurrentRecoveries));
for (ShardRouting startedShard : clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
assertThat(clusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag"),
equalTo("tag_1"));
}
}
// Ensure all shards are started
while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
clusterState = startInitializingShardsAndReroute(initialStrategy, clusterState);
}
clusterConcurrentRecoveries = clusterConcurrentRecoveries - randomInt(5);
excludeStrategy = createAllocationService(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(nodeConcurrentRecoveries))
.put("cluster.routing.allocation.cluster_concurrent_recoveries", String.valueOf(clusterConcurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1").build());
for (int counter = 0; counter < 3; counter++) {
logger.info("--> Performing a reroute ");
clusterState = excludeStrategy.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(),
equalTo(clusterConcurrentRecoveries));
for (ShardRouting startedShard : clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.STARTED)) {
assertThat(clusterState.getRoutingNodes().node(startedShard.currentNodeId()).node().getAttributes().get("tag"),
equalTo("tag_1"));
}
}
// Ensure all shards are started
while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
clusterState = startInitializingShardsAndReroute(initialStrategy, clusterState);
}
logger.info("--> Disabling cluster_concurrent_recoveries and re-routing ");
clusterConcurrentRecoveries = Math.min(srcNodes, targetNodes) * nodeConcurrentRecoveries;
for (int counter = 0; counter < 3; counter++) {
logger.info("--> Performing a reroute ");
excludeStrategy = createAllocationService(
Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(nodeConcurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1").build());
clusterState = excludeStrategy.reroute(clusterState, "reroute");
//When srcNodes < targetNodes relocations go beyond the Math.min(srcNodes, targetNodes) * nodeConcurrentRecoveries limit as
// outgoing recoveries happens target nodes which anyways doesn't get throttled on incoming recoveries
if (srcNodes >= targetNodes) {
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(),
equalTo(clusterConcurrentRecoveries));
}else {
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(),
greaterThanOrEqualTo(clusterConcurrentRecoveries));
}
}
// Ensure all shards are started
while (clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.INITIALIZING).size() > 0) {
clusterState = startInitializingShardsAndReroute(initialStrategy, clusterState);
}
logger.info("--> Bumping cluster_concurrent_recoveries up and re-routing ");
clusterConcurrentRecoveries = clusterConcurrentRecoveries + randomInt(5);
int expectedClusterConcurrentRecoveries = Math.min(srcNodes, targetNodes) * nodeConcurrentRecoveries;
for (int counter = 0; counter < 3; counter++) {
logger.info("--> Performing a reroute ");
excludeStrategy = createAllocationService(Settings.builder().put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.node_concurrent_recoveries", String.valueOf(nodeConcurrentRecoveries))
.put("cluster.routing.allocation.exclude.tag", "tag_1").build());
clusterState = excludeStrategy.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(ShardRoutingState.RELOCATING).size(),
equalTo(expectedClusterConcurrentRecoveries));
}
}
}
private DiscoveryNodes.Builder setUpClusterNodes(int sourceNodes, int targetNodes) {
DiscoveryNodes.Builder nb = DiscoveryNodes.builder();
for (int i = 1; i <= sourceNodes; i++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("tag", "tag_" + 1);
attributes.put("zone", "zone_" + (i % 2));
nb.add(newNode("node_s_" + i, attributes));
}
for (int j = 1; j <= targetNodes; j++) {
Map<String, String> attributes = new HashMap<>();
attributes.put("tag", "tag_" + 0);
attributes.put("zone", "zone_" + (j % 2));
nb.add(newNode("node_t_" + j, attributes));
}
return nb;
}
}