Simplify rebalancer's weight function (#51632)

This commit inlines the `weightShardAdded` and `weightShardRemoved` methods
from the `BalancedShardsAllocator#WeightFunction` that respectively add and
subtract 1 (±ε) from the result of `weight`. It then follows up with a number
of simplifications that this inlining enables.

As a side-effect it also somewhat reduces the number of calls to canRebalance
and canAllocate during rebalancing when there are multiple shards of the same
index on a node that is heavier than average.
This commit is contained in:
David Turner 2020-01-31 14:34:45 +00:00
parent 86f3b47299
commit 39a3a950de
1 changed files with 58 additions and 76 deletions

View File

@ -60,6 +60,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.stream.StreamSupport;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING; import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
@ -205,15 +206,14 @@ public class BalancedShardsAllocator implements ShardsAllocator {
* </ul> * </ul>
* <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index)</code> * <code>weight(node, index) = weight<sub>index</sub>(node, index) + weight<sub>node</sub>(node, index)</code>
*/ */
public static class WeightFunction { private static class WeightFunction {
private final float indexBalance; private final float indexBalance;
private final float shardBalance; private final float shardBalance;
private final float theta0; private final float theta0;
private final float theta1; private final float theta1;
WeightFunction(float indexBalance, float shardBalance) {
public WeightFunction(float indexBalance, float shardBalance) {
float sum = indexBalance + shardBalance; float sum = indexBalance + shardBalance;
if (sum <= 0.0f) { if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
@ -224,21 +224,9 @@ public class BalancedShardsAllocator implements ShardsAllocator {
this.shardBalance = shardBalance; this.shardBalance = shardBalance;
} }
public float weight(Balancer balancer, ModelNode node, String index) { float weight(Balancer balancer, ModelNode node, String index) {
return weight(balancer, node, index, 0); final float weightShard = node.numShards() - balancer.avgShardsPerNode();
} final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
public float weightShardAdded(Balancer balancer, ModelNode node, String index) {
return weight(balancer, node, index, 1);
}
public float weightShardRemoved(Balancer balancer, ModelNode node, String index) {
return weight(balancer, node, index, -1);
}
private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
final float weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex; return theta0 * weightShard + theta1 * weightIndex;
} }
} }
@ -377,9 +365,9 @@ public class BalancedShardsAllocator implements ShardsAllocator {
assert currentNode != null : "currently assigned node could not be found"; assert currentNode != null : "currently assigned node could not be found";
// balance the shard, if a better node can be found // balance the shard, if a better node can be found
final float currentWeight = sorter.weight(currentNode);
final AllocationDeciders deciders = allocation.deciders();
final String idxName = shard.getIndexName(); final String idxName = shard.getIndexName();
final float currentWeight = weight.weight(this, currentNode, idxName);
final AllocationDeciders deciders = allocation.deciders();
Type rebalanceDecisionType = Type.NO; Type rebalanceDecisionType = Type.NO;
ModelNode assignedNode = null; ModelNode assignedNode = null;
List<Tuple<ModelNode, Decision>> betterBalanceNodes = new ArrayList<>(); List<Tuple<ModelNode, Decision>> betterBalanceNodes = new ArrayList<>();
@ -394,7 +382,7 @@ public class BalancedShardsAllocator implements ShardsAllocator {
// this is a comparison of the number of shards on this node to the number of shards // this is a comparison of the number of shards on this node to the number of shards
// that should be on each node on average (both taking the cluster as a whole into account // that should be on each node on average (both taking the cluster as a whole into account
// as well as shards per index) // as well as shards per index)
final float nodeWeight = sorter.weight(node); final float nodeWeight = weight.weight(this, node, idxName);
// if the node we are examining has a worse (higher) weight than the node the shard is // if the node we are examining has a worse (higher) weight than the node the shard is
// assigned to, then there is no way moving the shard to the node with the worse weight // assigned to, then there is no way moving the shard to the node with the worse weight
// can make the balance of the cluster better, so we check for that here // can make the balance of the cluster better, so we check for that here
@ -408,12 +396,9 @@ public class BalancedShardsAllocator implements ShardsAllocator {
// more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless // more even, it doesn't make sense to execute the heavyweight operation of relocating a shard unless
// the gains make it worth it, as defined by the threshold // the gains make it worth it, as defined by the threshold
boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false; boolean deltaAboveThreshold = lessThan(currentDelta, threshold) == false;
// simulate the weight of the node if we were to relocate the shard to it
float weightWithShardAdded = weight.weightShardAdded(this, node, idxName);
// calculate the delta of the weights of the two nodes if we were to add the shard to the // calculate the delta of the weights of the two nodes if we were to add the shard to the
// node in question and move it away from the node that currently holds it. // node in question and move it away from the node that currently holds it.
float proposedDelta = weightWithShardAdded - weight.weightShardRemoved(this, currentNode, idxName); boolean betterWeightWithShardAdded = nodeWeight + 1.0f < currentWeight;
boolean betterWeightWithShardAdded = proposedDelta < currentDelta;
rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded; rebalanceConditionsMet = deltaAboveThreshold && betterWeightWithShardAdded;
// if the simulated weight delta with the shard moved away is better than the weight delta // if the simulated weight delta with the shard moved away is better than the weight delta
// with the shard remaining on the current node, and we are allowed to allocate to the // with the shard remaining on the current node, and we are allowed to allocate to the
@ -538,9 +523,18 @@ public class BalancedShardsAllocator implements ShardsAllocator {
logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]", logger.trace("Balancing from node [{}] weight: [{}] to node [{}] weight: [{}] delta: [{}]",
maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
} }
/* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes. if (delta <= 1.0f) {
* a relocation must bring us closer to the balance if we only achieve the same delta the relocation is useless */ /*
if (tryRelocateShard(minNode, maxNode, index, delta)) { * prevent relocations that only swap the weights of the two nodes. a relocation must bring us closer to the
* balance if we only achieve the same delta the relocation is useless
*
* NB this comment above was preserved from an earlier version but doesn't obviously describe the code today. We
* already know that lessThan(delta, threshold) == false and threshold defaults to 1.0, so by default we never
* hit this case anyway.
*/
logger.trace("Couldn't find shard to relocate from node [{}] to node [{}]",
maxNode.getNodeId(), minNode.getNodeId());
} else if (tryRelocateShard(minNode, maxNode, index)) {
/* /*
* TODO we could be a bit smarter here, we don't need to fully sort necessarily * TODO we could be a bit smarter here, we don't need to fully sort necessarily
* we could just find the place to insert linearly but the win might be minor * we could just find the place to insert linearly but the win might be minor
@ -666,12 +660,12 @@ public class BalancedShardsAllocator implements ShardsAllocator {
* to the {@link MoveDecision} return object: * to the {@link MoveDecision} return object:
* 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false. * 1. If the shard is not started, no decision will be taken and {@link MoveDecision#isDecisionTaken()} will return false.
* 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and * 2. If the shard is allowed to remain on its current node, no attempt will be made to move the shard and
* {@link MoveDecision#canRemainDecision} will have a decision type of YES. All other fields in the object will be null. * {@link MoveDecision#getCanRemainDecision} will have a decision type of YES. All other fields in the object will be null.
* 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be * 3. If the shard is not allowed to remain on its current node, then {@link MoveDecision#getAllocationDecision()} will be
* populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then * populated with the decision of moving to another node. If {@link MoveDecision#forceMove()} ()} returns {@code true}, then
* {@link MoveDecision#targetNode} will return a non-null value, otherwise the assignedNodeId will be null. * {@link MoveDecision#getTargetNode} will return a non-null value, otherwise the assignedNodeId will be null.
* 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then * 4. If the method is invoked in explain mode (e.g. from the cluster allocation explain APIs), then
* {@link MoveDecision#nodeDecisions} will have a non-null value. * {@link MoveDecision#getNodeDecisions} will have a non-null value.
*/ */
public MoveDecision decideMove(final ShardRouting shardRouting) { public MoveDecision decideMove(final ShardRouting shardRouting) {
if (shardRouting.started() == false) { if (shardRouting.started() == false) {
@ -915,8 +909,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {
continue; continue;
} }
// simulate weight if we would add shard to node // weight of this index currently on the node
float currentWeight = weight.weightShardAdded(this, node, shard.getIndexName()); float currentWeight = weight.weight(this, node, shard.getIndexName());
// moving the shard would not improve the balance, and we are not in explain mode, so short circuit // moving the shard would not improve the balance, and we are not in explain mode, so short circuit
if (currentWeight > minWeight && explain == false) { if (currentWeight > minWeight && explain == false) {
continue; continue;
@ -985,66 +979,54 @@ public class BalancedShardsAllocator implements ShardsAllocator {
); );
} }
private static final Comparator<ShardRouting> BY_DESCENDING_SHARD_ID = Comparator.comparing(ShardRouting::shardId).reversed();
/** /**
* Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the * Tries to find a relocation from the max node to the minimal node for an arbitrary shard of the given index on the
* balance model. Iff this method returns a <code>true</code> the relocation has already been executed on the * balance model. Iff this method returns a <code>true</code> the relocation has already been executed on the
* simulation model as well as on the cluster. * simulation model as well as on the cluster.
*/ */
private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx, float minCost) { private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx) {
final ModelIndex index = maxNode.getIndex(idx); final ModelIndex index = maxNode.getIndex(idx);
Decision decision = null;
if (index != null) { if (index != null) {
if (logger.isTraceEnabled()) { logger.trace("Try relocating shard of [{}] from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
logger.trace("Try relocating shard for index index [{}] from node [{}] to node [{}]", idx, maxNode.getNodeId(), final Iterable<ShardRouting> shardRoutings = StreamSupport.stream(index.spliterator(), false)
minNode.getNodeId()); .filter(ShardRouting::started) // cannot rebalance unassigned, initializing or relocating shards anyway
} .filter(maxNode::containsShard)
ShardRouting candidate = null; .sorted(BY_DESCENDING_SHARD_ID) // check in descending order of shard id so that the decision is deterministic
::iterator;
final AllocationDeciders deciders = allocation.deciders(); final AllocationDeciders deciders = allocation.deciders();
for (ShardRouting shard : index) { for (ShardRouting shard : shardRoutings) {
if (shard.started()) { final Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
// skip initializing, unassigned and relocating shards we can't relocate them anyway if (rebalanceDecision.type() == Type.NO) {
Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation); continue;
Decision rebalanceDecision = deciders.canRebalance(shard, allocation); }
if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE)) final Decision allocationDecision = deciders.canAllocate(shard, minNode.getRoutingNode(), allocation);
&& ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) { if (allocationDecision.type() == Type.NO) {
if (maxNode.containsShard(shard)) { continue;
// simulate moving shard from maxNode to minNode
final float delta = weight.weightShardAdded(
this, minNode, idx) - weight.weightShardRemoved(this, maxNode, idx);
if (delta < minCost ||
(candidate != null && Float.compare(delta, minCost) == 0 && candidate.id() > shard.id())) {
/* this last line is a tie-breaker to make the shard allocation alg deterministic
* otherwise we rely on the iteration order of the index.getAllShards() which is a set.*/
minCost = delta;
candidate = shard;
decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
}
}
}
} }
}
if (candidate != null) { final Decision decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
/* allocate on the model even if not throttled */
maxNode.removeShard(candidate);
long shardSize = allocation.clusterInfo().getShardSize(candidate, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
if (decision.type() == Type.YES) { /* only allocate on the cluster if we are not throttled */ maxNode.removeShard(shard);
logger.debug("Relocate shard [{}] from node [{}] to node [{}]", candidate, maxNode.getNodeId(), long shardSize = allocation.clusterInfo().getShardSize(shard, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
minNode.getNodeId());
/* now allocate on the cluster */ if (decision.type() == Type.YES) {
minNode.addShard(routingNodes.relocateShard(candidate, minNode.getNodeId(), shardSize, allocation.changes()).v1()); /* only allocate on the cluster if we are not throttled */
logger.debug("Relocate [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
minNode.addShard(routingNodes.relocateShard(shard, minNode.getNodeId(), shardSize, allocation.changes()).v1());
return true; return true;
} else { } else {
/* allocate on the model even if throttled */
logger.debug("Simulate relocation of [{}] from [{}] to [{}]", shard, maxNode.getNodeId(), minNode.getNodeId());
assert decision.type() == Type.THROTTLE; assert decision.type() == Type.THROTTLE;
minNode.addShard(candidate.relocate(minNode.getNodeId(), shardSize)); minNode.addShard(shard.relocate(minNode.getNodeId(), shardSize));
return false;
} }
} }
} }
if (logger.isTraceEnabled()) { logger.trace("No shards of [{}] can relocate from [{}] to [{}]", idx, maxNode.getNodeId(), minNode.getNodeId());
logger.trace("Couldn't find shard to relocate from node [{}] to node [{}] allocation decision [{}]",
maxNode.getNodeId(), minNode.getNodeId(), decision == null ? "NO" : decision.type().name());
}
return false; return false;
} }