Eliminate adding/removing shard to simulate weight of added shard / removed shard
Removal of the pattern node.addShard() -> calculate weight -> node.removeShard() which is expensive as, beside map lookups, it invalidates caching of precomputed values in ModelNode and ModelIndex. Replaced by adding an additional parameter to the weight function which accounts for the added / removed shard.
This commit is contained in:
parent
5bd31a6cca
commit
fc0a33be05
|
@ -189,8 +189,20 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
}
|
||||
|
||||
public float weight(Balancer balancer, ModelNode node, String index) {
|
||||
final float weightShard = (node.numShards() - balancer.avgShardsPerNode());
|
||||
final float weightIndex = (node.numShards(index) - balancer.avgShardsPerNode(index));
|
||||
return weight(balancer, node, index, 0);
|
||||
}
|
||||
|
||||
public float weightShardAdded(Balancer balancer, ModelNode node, String index) {
|
||||
return weight(balancer, node, index, 1);
|
||||
}
|
||||
|
||||
public float weightShardRemoved(Balancer balancer, ModelNode node, String index) {
|
||||
return weight(balancer, node, index, -1);
|
||||
}
|
||||
|
||||
private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
|
||||
final float weightShard = (node.numShards() + numAdditionalShards - balancer.avgShardsPerNode());
|
||||
final float weightIndex = (node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index));
|
||||
return theta0 * weightShard + theta1 * weightIndex;
|
||||
}
|
||||
|
||||
|
@ -627,20 +639,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
if (throttledNodes.contains(node)) {
|
||||
continue;
|
||||
}
|
||||
/*
|
||||
* The shard we add is removed below to simulate the
|
||||
* addition for weight calculation we use Decision.ALWAYS to
|
||||
* not violate the not null condition.
|
||||
*/
|
||||
if (!node.containsShard(shard)) {
|
||||
node.addShard(shard, Decision.ALWAYS);
|
||||
float currentWeight = weight.weight(this, node, shard.index());
|
||||
/*
|
||||
* Remove the shard from the node again this is only a
|
||||
* simulation
|
||||
*/
|
||||
Decision removed = node.removeShard(shard);
|
||||
assert removed != null;
|
||||
// simulate weight if we would add shard to node
|
||||
float currentWeight = weight.weightShardAdded(this, node, shard.index());
|
||||
/*
|
||||
* Unless the operation is not providing any gains we
|
||||
* don't check deciders
|
||||
|
@ -743,19 +744,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
final RoutingNode node = routingNodes.node(minNode.getNodeId());
|
||||
ShardRouting candidate = null;
|
||||
final AllocationDeciders deciders = allocation.deciders();
|
||||
/* make a copy since we modify this list in the loop */
|
||||
final ArrayList<ShardRouting> shards = new ArrayList<>(index.getAllShards());
|
||||
for (ShardRouting shard : shards) {
|
||||
for (ShardRouting shard : index.getAllShards()) {
|
||||
if (shard.started()) {
|
||||
// skip initializing, unassigned and relocating shards we can't relocate them anyway
|
||||
Decision allocationDecision = deciders.canAllocate(shard, node, allocation);
|
||||
Decision rebalanceDecision = deciders.canRebalance(shard, allocation);
|
||||
if (((allocationDecision.type() == Type.YES) || (allocationDecision.type() == Type.THROTTLE))
|
||||
&& ((rebalanceDecision.type() == Type.YES) || (rebalanceDecision.type() == Type.THROTTLE))) {
|
||||
Decision srcDecision;
|
||||
if ((srcDecision = maxNode.removeShard(shard)) != null) {
|
||||
minNode.addShard(shard, srcDecision);
|
||||
final float delta = weight.weight(this, minNode, idx) - weight.weight(this, maxNode, idx);
|
||||
if (maxNode.containsShard(shard)) {
|
||||
// simulate moving shard from maxNode to minNode
|
||||
final float delta = weight.weightShardAdded(this, minNode, idx) - weight.weightShardRemoved(this, maxNode, idx);
|
||||
if (delta < minCost ||
|
||||
(candidate != null && delta == minCost && candidate.id() > shard.id())) {
|
||||
/* this last line is a tie-breaker to make the shard allocation alg deterministic
|
||||
|
@ -764,8 +762,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
|
|||
candidate = shard;
|
||||
decision = new Decision.Multi().add(allocationDecision).add(rebalanceDecision);
|
||||
}
|
||||
minNode.removeShard(shard);
|
||||
maxNode.addShard(shard, srcDecision);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue