diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
index 9a25c45ea35..48a9cd111fc 100644
--- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
+++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
@@ -60,6 +60,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@@ -205,15 +206,14 @@ public class BalancedShardsAllocator implements ShardsAllocator {
*
* weight(node, index) = weightindex(node, index) + weightnode(node, index)
*/
- public static class WeightFunction {
+ private static class WeightFunction {
private final float indexBalance;
private final float shardBalance;
private final float theta0;
private final float theta1;
-
- public WeightFunction(float indexBalance, float shardBalance) {
+ WeightFunction(float indexBalance, float shardBalance) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
@@ -224,21 +224,9 @@ public class BalancedShardsAllocator implements ShardsAllocator {
this.shardBalance = shardBalance;
}
- public float weight(Balancer balancer, ModelNode node, String index) {
- return weight(balancer, node, index, 0);
- }
-
- public float weightShardAdded(Balancer balancer, ModelNode node, String index) {
- return weight(balancer, node, index, 1);
- }
-
- public float weightShardRemoved(Balancer balancer, ModelNode node, String index) {
- return weight(balancer, node, index, -1);
- }
-
- private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
- final float weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode();
- final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index);
+ float weight(Balancer balancer, ModelNode node, String index) {
+ final float weightShard = node.numShards() - balancer.avgShardsPerNode();
+ final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex;
}
}
@@ -377,9 +365,9 @@ public class BalancedShardsAllocator implements ShardsAllocator {
assert currentNode != null : "currently assigned node could not be found";
// balance the shard, if a better node can be found
- final float currentWeight = sorter.weight(currentNode);
- final AllocationDeciders deciders = allocation.deciders();
final String idxName = shard.getIndexName();
+ final float currentWeight = weight.weight(this, currentNode, idxName);
+ final AllocationDeciders deciders = allocation.deciders();
Type rebalanceDecisionType = Type.NO;
ModelNode assignedNode = null;
List> betterBalanceNodes = new ArrayList<>();
@@ -394,7 +382,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
// this is a comparison of the number of shards on this node to the number of shards
// that should be on each node on average (both taking the cluster as a whole into account
// as well as shards per index)
- final float nodeWeight = sorter.weight(node);
+ final float nodeWeight = weight.weight(this, node, idxName);
// if the node we are examining has a worse (higher) weight than the node the shard is
// assigned to, then there is no way moving the shard to the node with the worse weight
// can make the balance of the cluster better, so we check for that here
@@ -408,12 +396,9 @@ public class BalancedShardsAllocator implements ShardsAllocator {
// more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless
// the gains make it worth it, as defined by the threshold
boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false;
- // simulate the weight of the node if we were to relocate the shard to it
- float weightWithShardAdded = weight.weightShardAdded(this, node, idxName);
// calculate the delta of the weights of the two nodes if we were to add the shard to the
// node in question and move it away from the node that currently holds it.
- float proposedDelta = weightWithShardAdded - weight.weightShardRemoved(this, currentNode, idxName);
- boolean betterWeightWithShardAdded = proposedDelta < currentDelta;
+ boolean betterWeightWithShardAdded = nodeWeight + 1.0f < currentWeight;
rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded;
// if the simulated weight delta with the shard moved away is better than the weight delta
// with the shard remaining on the current node, and we are allowed to allocate to the
@@ -538,9 +523,18 @@ public class BalancedShardsAllocator implements ShardsAllocator {
logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]",
maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
}
- /* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes.
- * a relocation must bring us closer to the balance if we only achieve the same delta the relocation is useless */
- if (tryRelocateShard(minNode, maxNode, index, delta)) {
+ if (delta <= 1.0f) {
+ /*
+ * prevent relocations that only swap the weights of the two nodes. a relocation must bring us closer to the
+ * balance if we only achieve the same delta the relocation is useless
+ *
+ * NB this comment above was preserved from an earlier version but doesn't obviously describe the code today. We
+ * already know that lessThan(delta, threshold) == false and threshold defaults to 1.0, so by default we never
+ * hit this case anyway.
+ */
+ logger.trace("Couldn't find shard to relocate from node [{}] to node [{}]",
+ maxNode.getNodeId(), minNode.getNodeId());
+ } else if (tryRelocateShard(minNode, maxNode, index)) {
/*
* TODO we could be a bit smarter here, we don't need to fully sort necessarily
* we could just find the place to insert linearly but the win might be minor
@@ -666,12 +660,12 @@ public class BalancedShardsAllocator implements ShardsAllocator {
* to the {@link MoveDecision} return object:
* 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false.
* 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and
- * {@link MoveDecision#canRemainDecision} will have a decision type of YES. All other fields in the object will be null.
+ * {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null.
* 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be
* populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then
- * {@link MoveDecision#targetNode} will return a non-null value, otherwise the assignedNodeId will be null.
+ * {@link MoveDecision#getTargetNode} will return a non-null value, otherwise the assignedNodeId will be null.
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then
- * {@link MoveDecision#nodeDecisions} will have a non-null value.
+ * {@link MoveDecision#getNodeDecisions} will have a non-null value.
*/
public MoveDecision decideMove(final ShardRouting shardRouting) {
if (shardRouting.started() == false) {
@@ -915,8 +909,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
continue;
}
- // simulate weight if we would add shard to node
- float currentWeight = weight.weightShardAdded(this, node, shard.getIndexName());
+ // weight of this index currently on the node
+ float currentWeight = weight.weight(this, node, shard.getIndexName());
// moving the shard would not improve the balance, and we are not in explain mode, so short circuit
if (currentWeight > minWeight && explain == false) {
continue;
@@ -985,66 +979,54 @@ public class BalancedShardsAllocator implements ShardsAllocator {
);
}
+ private static final Comparator BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed();
+
/**
* Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the
* balance model. Iff this method returns a true
the relocation has already been executed on the
* simulation model as well as on the cluster.
*/
- private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx, float minCost) {
+ private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx) {
final ModelIndex index = maxNode.getIndex(idx);
- Decision decision = null;
if (index != null) {
- if (logger.isTraceEnabled()) {
- logger.trace("Try relocating shard for index index [{}] from node [{}] to node [{}]", idx, maxNode.getNodeId(),
- minNode.getNodeId());
- }
- ShardRouting candidate = null;
+ logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
+ final Iterable shardRoutings = StreamSupport.stream(index.spliterator(), false)
+ .filter(ShardRouting::started) // cannot rebalance unassigned, initializing or relocating shards anyway
+ .filter(maxNode::containsShard)
+ .sorted(BY_DESCENDING_SHARD_ID) // check in descending order of shard id so that the decision is deterministic
+ ::iterator;
+
final AllocationDeciders deciders = allocation.deciders();
- for (ShardRouting shard : index) {
- if (shard.started()) {
- // skip initializing, unassigned and relocating shards we can't relocate them anyway
- Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation);
- Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
- if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE))
- && ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) {
- if (maxNode.containsShard(shard)) {
- // simulate moving shard from maxNode to minNode
- final float delta = weight.weightShardAdded(
- this, minNode, idx) - weight.weightShardRemoved(this, maxNode, idx);
- if (delta < minCost ||
- (candidate != null && Float.compare(delta, minCost) == 0 && candidate.id() > shard.id())) {
- /* this last line is a tie-breaker to make the shard allocation alg deterministic
- * otherwise we rely on the iteration order of the index.getAllShards() which is a set.*/
- minCost = delta;
- candidate = shard;
- decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
- }
- }
- }
+ for (ShardRouting shard : shardRoutings) {
+ final Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
+ if (rebalanceDecision.type() == Type.NO) {
+ continue;
+ }
+ final Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation);
+ if (allocationDecision.type() == Type.NO) {
+ continue;
}
- }
- if (candidate != null) {
- /* allocate on the model even if not throttled */
- maxNode.removeShard(candidate);
- long shardSize = allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
+ final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
- if (decision.type() == Type.YES) { /* only allocate on the cluster if we are not throttled */
- logger.debug("Relocate shard [{}] from node [{}] to node [{}]", candidate, maxNode.getNodeId(),
- minNode.getNodeId());
- /* now allocate on the cluster */
- minNode.addShard(routingNodes.relocateShard(candidate, minNode.getNodeId(), shardSize, allocation.changes()).v1());
+ maxNode.removeShard(shard);
+ long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
+
+ if (decision.type() == Type.YES) {
+ /* only allocate on the cluster if we are not throttled */
+ logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
+ minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1());
return true;
} else {
+ /* allocate on the model even if throttled */
+ logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
assert decision.type() == Type.THROTTLE;
- minNode.addShard(candidate.relocate(minNode.getNodeId(), shardSize));
+ minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize));
+ return false;
}
}
}
- if (logger.isTraceEnabled()) {
- logger.trace("Couldn't find shard to relocate from node [{}] to node [{}] allocation decision [{}]",
- maxNode.getNodeId(), minNode.getNodeId(), decision == null ? "NO" : decision.type().name());
- }
+ logger.trace("No shards of [{}] can relocate from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
return false;
}