BalancedShardAllocator code improvements (#20746)

This commit improves the logic flow of BalancedShardsAllocator in
preparation for separating out components of this class to be used
in the cluster allocation explain APIs.  In particular, this commit:

 1. Adds a minimum value for the index/shard balance factor settings (0.0)
 2. Makes the Balancer data structures immutable and pre-calculated at
    construction time.
 3. Removes difficult to follow labeled blocks / GOTOs
 4. Better logic for skipping over the same replica set when one of
    the replicas received a NO decision
 5. Separates the decision making logic for a single shard from the logic
    to iterate over all unassigned shards.
This commit is contained in:
Ali Beyad 2016-10-05 14:23:25 -04:00 committed by GitHub
parent 8e27d741c0
commit 15950b71b8

View File

@ -73,9 +73,9 @@ import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
public class BalancedShardsAllocator extends AbstractComponent implements ShardsAllocator { public class BalancedShardsAllocator extends AbstractComponent implements ShardsAllocator {
public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING =
Setting.floatSetting("cluster.routing.allocation.balance.index", 0.55f, Property.Dynamic, Property.NodeScope); Setting.floatSetting("cluster.routing.allocation.balance.index", 0.55f, 0.0f, Property.Dynamic, Property.NodeScope);
public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING =
Setting.floatSetting("cluster.routing.allocation.balance.shard", 0.45f, Property.Dynamic, Property.NodeScope); Setting.floatSetting("cluster.routing.allocation.balance.shard", 0.45f, 0.0f, Property.Dynamic, Property.NodeScope);
public static final Setting<Float> THRESHOLD_SETTING = public static final Setting<Float> THRESHOLD_SETTING =
Setting.floatSetting("cluster.routing.allocation.balance.threshold", 1.0f, 0.0f, Setting.floatSetting("cluster.routing.allocation.balance.threshold", 1.0f, 0.0f,
Property.Dynamic, Property.NodeScope); Property.Dynamic, Property.NodeScope);
@ -210,7 +210,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
*/ */
public static class Balancer { public static class Balancer {
private final Logger logger; private final Logger logger;
private final Map<String, ModelNode> nodes = new HashMap<>(); private final Map<String, ModelNode> nodes;
private final RoutingAllocation allocation; private final RoutingAllocation allocation;
private final RoutingNodes routingNodes; private final RoutingNodes routingNodes;
private final WeightFunction weight; private final WeightFunction weight;
@ -218,6 +218,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
private final float threshold; private final float threshold;
private final MetaData metaData; private final MetaData metaData;
private final float avgShardsPerNode; private final float avgShardsPerNode;
private final NodeSorter sorter;
public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) { public Balancer(Logger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) {
this.logger = logger; this.logger = logger;
@ -227,7 +228,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
this.routingNodes = allocation.routingNodes(); this.routingNodes = allocation.routingNodes();
this.metaData = allocation.metaData(); this.metaData = allocation.metaData();
avgShardsPerNode = ((float) metaData.getTotalNumberOfShards()) / routingNodes.size(); avgShardsPerNode = ((float) metaData.getTotalNumberOfShards()) / routingNodes.size();
buildModelFromAssigned(); nodes = Collections.unmodifiableMap(buildModelFromAssigned());
sorter = newNodeSorter();
} }
/** /**
@ -304,11 +306,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
public Map<DiscoveryNode, Float> weighShard(ShardRouting shard) { public Map<DiscoveryNode, Float> weighShard(ShardRouting shard) {
final NodeSorter sorter = newNodeSorter();
final ModelNode[] modelNodes = sorter.modelNodes; final ModelNode[] modelNodes = sorter.modelNodes;
final float[] weights = sorter.weights; final float[] weights = sorter.weights;
buildWeightOrderedIndices(sorter); buildWeightOrderedIndices();
Map<DiscoveryNode, Float> nodes = new HashMap<>(modelNodes.length); Map<DiscoveryNode, Float> nodes = new HashMap<>(modelNodes.length);
float currentNodeWeight = 0.0f; float currentNodeWeight = 0.0f;
for (int i = 0; i < modelNodes.length; i++) { for (int i = 0; i < modelNodes.length; i++) {
@ -332,20 +333,19 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* weight of the maximum node and the minimum node according to the * weight of the maximum node and the minimum node according to the
* {@link WeightFunction}. This weight is calculated per index to * {@link WeightFunction}. This weight is calculated per index to
* distribute shards evenly per index. The balancer tries to relocate * distribute shards evenly per index. The balancer tries to relocate
* shards only if the delta exceeds the threshold. If the default case * shards only if the delta exceeds the threshold. In the default case
* the threshold is set to <tt>1.0</tt> to enforce gaining relocation * the threshold is set to <tt>1.0</tt> to enforce gaining relocation
* only, or in other words relocations that move the weight delta closer * only, or in other words relocations that move the weight delta closer
* to <tt>0.0</tt> * to <tt>0.0</tt>
*/ */
private void balanceByWeights() { private void balanceByWeights() {
final NodeSorter sorter = newNodeSorter();
final AllocationDeciders deciders = allocation.deciders(); final AllocationDeciders deciders = allocation.deciders();
final ModelNode[] modelNodes = sorter.modelNodes; final ModelNode[] modelNodes = sorter.modelNodes;
final float[] weights = sorter.weights; final float[] weights = sorter.weights;
for (String index : buildWeightOrderedIndices(sorter)) { for (String index : buildWeightOrderedIndices()) {
IndexMetaData indexMetaData = metaData.index(index); IndexMetaData indexMetaData = metaData.index(index);
// find nodes that have a shard of this index or where shards of this index are allowed to stay // find nodes that have a shard of this index or where shards of this index are allowed to be allocated to,
// move these nodes to the front of modelNodes so that we can only balance based on these nodes // move these nodes to the front of modelNodes so that we can only balance based on these nodes
int relevantNodes = 0; int relevantNodes = 0;
for (int i = 0; i < modelNodes.length; i++) { for (int i = 0; i < modelNodes.length; i++) {
@ -440,14 +440,14 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* allocations on added nodes from one index when the weight parameters * allocations on added nodes from one index when the weight parameters
* for global balance overrule the index balance at an intermediate * for global balance overrule the index balance at an intermediate
* state. For example this can happen if we have 3 nodes and 3 indices * state. For example this can happen if we have 3 nodes and 3 indices
* with 3 shards and 1 shard. At the first stage all three nodes hold * with 3 primary and 1 replica shards. At the first stage all three nodes hold
* 2 shard for each index. now we add another node and the first index * 2 shard for each index. Now we add another node and the first index
* is balanced moving 3 two of the nodes over to the new node since it * is balanced moving three shards from two of the nodes over to the new node since it
* has no shards yet and global balance for the node is way below * has no shards yet and global balance for the node is way below
* average. To re-balance we need to move shards back eventually likely * average. To re-balance we need to move shards back eventually likely
* to the nodes we relocated them from. * to the nodes we relocated them from.
*/ */
private String[] buildWeightOrderedIndices(NodeSorter sorter) { private String[] buildWeightOrderedIndices() {
final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class); final String[] indices = allocation.routingTable().indicesRouting().keys().toArray(String.class);
final float[] deltas = new float[indices.length]; final float[] deltas = new float[indices.length];
for (int i = 0; i < deltas.length; i++) { for (int i = 0; i < deltas.length; i++) {
@ -501,7 +501,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
// Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling // Iterate over the started shards interleaving between nodes, and check if they can remain. In the presence of throttling
// shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are // shard movements, the goal of this iteration order is to achieve a fairer movement of shards from the nodes that are
// offloading the shards. // offloading the shards.
final NodeSorter sorter = newNodeSorter();
for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) { for (Iterator<ShardRouting> it = allocation.routingNodes().nodeInterleavedShardIterator(); it.hasNext(); ) {
ShardRouting shardRouting = it.next(); ShardRouting shardRouting = it.next();
// we can only move started shards... // we can only move started shards...
@ -511,7 +510,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
RoutingNode routingNode = sourceNode.getRoutingNode(); RoutingNode routingNode = sourceNode.getRoutingNode();
Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation); Decision decision = allocation.deciders().canRemain(shardRouting, routingNode, allocation);
if (decision.type() == Decision.Type.NO) { if (decision.type() == Decision.Type.NO) {
moveShard(sorter, shardRouting, sourceNode, routingNode); moveShard(shardRouting, sourceNode, routingNode);
} }
} }
} }
@ -520,7 +519,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
/** /**
* Move started shard to the minimal eligible node with respect to the weight function * Move started shard to the minimal eligible node with respect to the weight function
*/ */
private void moveShard(NodeSorter sorter, ShardRouting shardRouting, ModelNode sourceNode, RoutingNode routingNode) { private void moveShard(ShardRouting shardRouting, ModelNode sourceNode, RoutingNode routingNode) {
logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node()); logger.debug("[{}][{}] allocated on [{}], but can no longer be allocated on it, moving...", shardRouting.index(), shardRouting.id(), routingNode.node());
sorter.reset(shardRouting.getIndexName()); sorter.reset(shardRouting.getIndexName());
/* /*
@ -557,7 +556,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* on the target node which we respect during the allocation / balancing * on the target node which we respect during the allocation / balancing
* process. In short, this method recreates the status-quo in the cluster. * process. In short, this method recreates the status-quo in the cluster.
*/ */
private void buildModelFromAssigned() { private Map<String, ModelNode> buildModelFromAssigned() {
Map<String, ModelNode> nodes = new HashMap<>();
for (RoutingNode rn : routingNodes) { for (RoutingNode rn : routingNodes) {
ModelNode node = new ModelNode(rn); ModelNode node = new ModelNode(rn);
nodes.put(rn.nodeId(), node); nodes.put(rn.nodeId(), node);
@ -572,6 +572,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
} }
} }
return nodes;
} }
/** /**
@ -626,22 +627,85 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
do { do {
for (int i = 0; i < primaryLength; i++) { for (int i = 0; i < primaryLength; i++) {
ShardRouting shard = primary[i]; ShardRouting shard = primary[i];
if (!shard.primary()) { Tuple<Decision, ModelNode> allocationDecision = allocateUnassignedShard(shard, throttledNodes);
final Decision decision = deciders.canAllocate(shard, allocation); final Decision decision = allocationDecision.v1();
if (decision.type() == Type.NO) { final ModelNode minNode = allocationDecision.v2();
UnassignedInfo.AllocationStatus allocationStatus = UnassignedInfo.AllocationStatus.fromDecision(decision);
unassigned.ignoreShard(shard, allocationStatus, allocation.changes()); if (decision.type() == Type.YES) {
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) { if (logger.isTraceEnabled()) {
unassigned.ignoreShard(primary[++i], allocationStatus, allocation.changes()); logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
} }
continue;
} else { final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes());
minNode.addShard(shard);
if (!shard.primary()) {
// copy over the same replica shards to the secondary array so they will get allocated
// in a subsequent iteration, allowing replicas of other shards to be allocated first
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) { while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
secondary[secondaryLength++] = primary[++i]; secondary[secondaryLength++] = primary[++i];
} }
} }
} else {
// did *not* receive a YES decision
if (logger.isTraceEnabled()) {
logger.trace("No eligible node found to assign shard [{}] decision [{}]", shard, decision.type());
} }
assert !shard.assignedToNode() : shard;
if (minNode != null) {
// throttle decision scenario
assert decision.type() == Type.THROTTLE;
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
final RoutingNode node = minNode.getRoutingNode();
final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type();
if (nodeLevelDecision != Type.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type());
}
assert nodeLevelDecision == Type.NO;
throttledNodes.add(minNode);
}
} else {
assert decision.type() == Type.NO;
if (logger.isTraceEnabled()) {
logger.trace("No Node found to assign shard [{}]", shard);
}
}
UnassignedInfo.AllocationStatus allocationStatus = UnassignedInfo.AllocationStatus.fromDecision(decision);
unassigned.ignoreShard(shard, allocationStatus, allocation.changes());
if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas
while(i < primaryLength-1 && comparator.compare(primary[i], primary[i+1]) == 0) {
unassigned.ignoreShard(primary[++i], allocationStatus, allocation.changes());
}
}
}
}
primaryLength = secondaryLength;
ShardRouting[] tmp = primary;
primary = secondary;
secondary = tmp;
secondaryLength = 0;
} while (primaryLength > 0);
// clear everything we have either added it or moved to ignoreUnassigned
}
/**
* Make a decision for allocating an unassigned shard. This method returns a two values in a tuple: the
* first value is the {@link Decision} taken to allocate the unassigned shard, the second value is the
* {@link ModelNode} representing the node that the shard should be assigned to. If the decision returned
* is of type {@link Type#NO}, then the assigned node will be null.
*/
private Tuple<Decision, ModelNode> allocateUnassignedShard(final ShardRouting shard, final Set<ModelNode> throttledNodes) {
assert !shard.assignedToNode() : "not an unassigned shard: " + shard;
if (allocation.deciders().canAllocate(shard, allocation).type() == Type.NO) {
// NO decision for allocating the shard, irrespective of any particular node, so exit early
return Tuple.tuple(Decision.NO, null);
}
/* find an node with minimal weight we can allocate on*/ /* find an node with minimal weight we can allocate on*/
float minWeight = Float.POSITIVE_INFINITY; float minWeight = Float.POSITIVE_INFINITY;
ModelNode minNode = null; ModelNode minNode = null;
@ -661,9 +725,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* don't check deciders * don't check deciders
*/ */
if (currentWeight <= minWeight) { if (currentWeight <= minWeight) {
Decision currentDecision = deciders.canAllocate(shard, node.getRoutingNode(), allocation); Decision currentDecision = allocation.deciders().canAllocate(shard, node.getRoutingNode(), allocation);
NOUPDATE:
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
final boolean updateMinNode;
if (currentWeight == minWeight) { if (currentWeight == minWeight) {
/* we have an equal weight tie breaking: /* we have an equal weight tie breaking:
* 1. if one decision is YES prefer it * 1. if one decision is YES prefer it
@ -680,16 +744,17 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
final int repId = shard.id(); final int repId = shard.id();
final int nodeHigh = node.highestPrimary(shard.index().getName()); final int nodeHigh = node.highestPrimary(shard.index().getName());
final int minNodeHigh = minNode.highestPrimary(shard.getIndexName()); final int minNodeHigh = minNode.highestPrimary(shard.getIndexName());
if ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) && (nodeHigh < minNodeHigh)) updateMinNode = ((((nodeHigh > repId && minNodeHigh > repId)
|| (nodeHigh > minNodeHigh && nodeHigh > repId && minNodeHigh < repId)) { || (nodeHigh < repId && minNodeHigh < repId))
// nothing to set here; the minNode, minWeight, and decision get set below && (nodeHigh < minNodeHigh))
|| (nodeHigh > minNodeHigh && nodeHigh > repId && minNodeHigh < repId));
} else { } else {
break NOUPDATE; updateMinNode = currentDecision.type() == Type.YES;
}
} else if (currentDecision.type() != Type.YES) {
break NOUPDATE;
} }
} else {
updateMinNode = true;
} }
if (updateMinNode) {
minNode = node; minNode = node;
minWeight = currentWeight; minWeight = currentWeight;
decision = currentDecision; decision = currentDecision;
@ -698,54 +763,12 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
} }
} }
} }
assert (decision == null) == (minNode == null);
if (minNode != null) {
final long shardSize = DiskThresholdDecider.getExpectedShardSize(shard, allocation,
ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
if (decision.type() == Type.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
} }
if (decision == null) {
shard = routingNodes.initializeShard(shard, minNode.getNodeId(), null, shardSize, allocation.changes()); // decision was not set and a node was not assigned, so treat it as a NO decision
minNode.addShard(shard); decision = Decision.NO;
continue; // don't add to ignoreUnassigned
} else {
minNode.addShard(shard.initialize(minNode.getNodeId(), null, shardSize));
final RoutingNode node = minNode.getRoutingNode();
final Decision.Type nodeLevelDecision = deciders.canAllocate(node, allocation).type();
if (nodeLevelDecision != Type.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Can not allocate on node [{}] remove from round decision [{}]", node, decision.type());
} }
assert nodeLevelDecision == Type.NO; return Tuple.tuple(decision, minNode);
throttledNodes.add(minNode);
}
}
if (logger.isTraceEnabled()) {
logger.trace("No eligible node found to assign shard [{}] decision [{}]", shard, decision.type());
}
} else if (logger.isTraceEnabled()) {
logger.trace("No Node found to assign shard [{}]", shard);
}
assert decision == null || decision.type() == Type.THROTTLE;
UnassignedInfo.AllocationStatus allocationStatus =
decision == null ? UnassignedInfo.AllocationStatus.DECIDERS_NO :
UnassignedInfo.AllocationStatus.fromDecision(decision);
unassigned.ignoreShard(shard, allocationStatus, allocation.changes());
if (!shard.primary()) { // we could not allocate it and we are a replica - check if we can ignore the other replicas
while(secondaryLength > 0 && comparator.compare(shard, secondary[secondaryLength-1]) == 0) {
unassigned.ignoreShard(secondary[--secondaryLength], allocationStatus, allocation.changes());
}
}
}
primaryLength = secondaryLength;
ShardRouting[] tmp = primary;
primary = secondary;
secondary = tmp;
secondaryLength = 0;
} while (primaryLength > 0);
// clear everything we have either added it or moved to ignoreUnassigned
} }
/** /**