Introduced a Opertaion enum that is passed to each call of

WeightFunction#weight to allow dedicated weight calculations per operation. In certain
circumstance it is more efficient / required to ignore certain factors in the weight
calculation to prevent for instance relocations if they are solely triggered by tie-breakers.
In particular the primary balance property should not be taken into account if the delta for
early termination is calculated since otherwise a relocation could be triggered solely by the
fact that two nodes have different amount of primaries allocated to them.

Closes #2984
This commit is contained in:
Simon Willnauer 2013-05-02 17:47:58 +02:00
parent ad92d82680
commit c9c10273a6
2 changed files with 251 additions and 30 deletions

View File

@ -74,9 +74,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
class ApplySettings implements NodeSettingsService.Listener {
@Override
public void onRefreshSettings(Settings settings) {
float indexBalance = settings.getAsFloat(SETTING_INDEX_BALANCE_FACTOR, weightFunction.indexBalance);
float shardBalance = settings.getAsFloat(SETTING_SHARD_BALANCE_FACTOR, weightFunction.shardBalance);
float primaryBalance = settings.getAsFloat(SETTING_PRIMARY_BALANCE_FACTOR, weightFunction.primaryBalance);
final float indexBalance = settings.getAsFloat(SETTING_INDEX_BALANCE_FACTOR, weightFunction.indexBalance);
final float shardBalance = settings.getAsFloat(SETTING_SHARD_BALANCE_FACTOR, weightFunction.shardBalance);
final float primaryBalance = settings.getAsFloat(SETTING_PRIMARY_BALANCE_FACTOR, weightFunction.primaryBalance);
float threshold = settings.getAsFloat(SETTING_THRESHOLD, BalancedShardsAllocator.this.threshold);
if (threshold <= 0.0f) {
throw new ElasticSearchIllegalArgumentException("threshold must be greater than 0.0f but was: " + threshold);
@ -87,7 +87,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
private volatile WeightFunction weightFunction = new WeightFunction(DEFAULT_INDEX_BALANCE_FACTOR, DEFAULT_SHARD_BALANCE_FACTOR, DEFAULT_PRIMARY_BALANCE_FACTOR);
private volatile float threshold = 1.0f;
public BalancedShardsAllocator(Settings settings) {
this(settings, new NodeSettingsService(settings));
@ -185,25 +187,72 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
private final float indexBalance;
private final float shardBalance;
private final float primaryBalance;
private final EnumMap<Operation, float[]> thetaMap = new EnumMap<BalancedShardsAllocator.Operation, float[]>(Operation.class);
public WeightFunction(float indexBalance, float shardBalance, float primaryBalance) {
final float sum = indexBalance + shardBalance + primaryBalance;
float sum = indexBalance + shardBalance + primaryBalance;
if (sum <= 0.0f) {
throw new ElasticSearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
this.indexBalance = indexBalance / sum;
this.shardBalance = shardBalance / sum;
this.primaryBalance = primaryBalance / sum;
final float[] defaultTheta = new float[] { shardBalance / sum, indexBalance / sum, primaryBalance / sum };
for(Operation operation : Operation.values()) {
switch(operation) {
case THRESHOLD_CHECK:
sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
thetaMap.put(operation, defaultTheta);
}
thetaMap.put(operation, new float[] { shardBalance / sum, indexBalance / sum, 0});
break;
case BALANCE:
case ALLOCATE:
case MOVE:
thetaMap.put(operation, defaultTheta);
break;
default:
assert false;
}
}
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
this.primaryBalance = primaryBalance;
}
public float weight(Balancer balancer, ModelNode node, String index) {
final float weightShard = shardBalance * (node.numShards() - balancer.avgShardsPerNode());
final float weightIndex = indexBalance * (node.numShards(index) - balancer.avgShardsPerNode(index));
final float weightPrimary = primaryBalance * (node.numPrimaries() - balancer.avgPrimariesPerNode());
return weightShard + weightIndex + weightPrimary;
public float weight(Operation operation, Balancer balancer, ModelNode node, String index) {
final float weightShard = (node.numShards() - balancer.avgShardsPerNode());
final float weightIndex = (node.numShards(index) - balancer.avgShardsPerNode(index));
final float weightPrimary = (node.numPrimaries() - balancer.avgPrimariesPerNode());
final float[] theta = thetaMap.get(operation);
assert theta != null;
return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary;
}
}
/**
* An enum that donates the actual operation the {@link WeightFunction} is
* applied to.
*/
public static enum Operation {
/**
* Provided during balance operations.
*/
BALANCE,
/**
* Provided during initial allocation operation for unassigned shards.
*/
ALLOCATE,
/**
* Provided during move operation.
*/
MOVE,
/**
* Provided when the weight delta is checked against the configured threshold.
* This can be used to ignore tie-breaking weight factors that should not
* solely trigger a relocation unless the delta is above the threshold.
*/
THRESHOLD_CHECK
}
/**
* A {@link Balancer}
@ -218,7 +267,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
private final float threshold;
private final MetaData metaData;
private final Predicate<MutableShardRouting> assignedFilter = new Predicate<MutableShardRouting>() {
@Override
public boolean apply(MutableShardRouting input) {
@ -226,6 +275,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
};
public Balancer(ESLogger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) {
this.logger = logger;
this.allocation = allocation;
@ -325,8 +375,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
boolean changed = initialize(allocation.routingNodes());
NodeSorter sorter = newNodeSorter();
if (nodes.size() > 1) { /* skip if we only have one node */
for (String index : buildWeightOrderedIndidces(sorter)) {
sorter.reset(index);
for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) {
sorter.reset(Operation.BALANCE,index);
final float[] weights = sorter.weights;
final ModelNode[] modelNodes = sorter.modelNodes;
int lowIdx = 0;
@ -336,8 +386,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
final ModelNode maxNode = modelNodes[highIdx];
if (maxNode.numShards(index) > 0) {
float delta = weights[highIdx] - weights[lowIdx];
delta = delta <= threshold ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode);
if (delta <= threshold) {
if (logger.isTraceEnabled()) {
logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]",
index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
}
@ -349,14 +401,14 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
/* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes.
* a relocation must bring us closer to the balance if we only achive the same delta the relocation is useless */
if (tryRelocateShard(minNode, maxNode, index, delta)) {
if (tryRelocateShard(Operation.BALANCE, minNode, maxNode, index, delta)) {
/*
* TODO we could be a bit smarter here, we don't need to fully sort necessarily
* we could just find the place to insert linearly but the win might be minor
* compared to the added complexity
*/
weights[lowIdx] = sorter.weight(modelNodes[lowIdx]);
weights[highIdx] = sorter.weight(modelNodes[highIdx]);
weights[lowIdx] = sorter.weight(Operation.BALANCE, modelNodes[lowIdx]);
weights[highIdx] = sorter.weight(Operation.BALANCE, modelNodes[highIdx]);
sorter.quickSort(0, weights.length - 1);
lowIdx = 0;
highIdx = weights.length - 1;
@ -397,11 +449,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* average. To re-balance we need to move shards back eventually likely
* to the nodes we relocated them from.
*/
private String[] buildWeightOrderedIndidces(NodeSorter sorter) {
private String[] buildWeightOrderedIndidces(Operation operation, NodeSorter sorter) {
final String[] indices = this.indices.toArray(new String[this.indices.size()]);
final float[] deltas = new float[indices.length];
for (int i = 0; i < deltas.length; i++) {
sorter.reset(indices[i]);
sorter.reset(operation, indices[i]);
deltas[i] = sorter.delta();
}
new SorterTemplate() {
@ -459,7 +511,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
final ModelNode sourceNode = nodes.get(node.nodeId());
assert sourceNode != null;
final NodeSorter sorter = newNodeSorter();
sorter.reset(shard.getIndex());
sorter.reset(Operation.MOVE, shard.getIndex());
final ModelNode[] nodes = sorter.modelNodes;
assert sourceNode.containsShard(shard);
/*
@ -577,7 +629,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
*/
if (!node.containsShard(shard)) {
node.addShard(shard, Decision.ALWAYS);
float currentWeight = weight.weight(this, node, shard.index());
float currentWeight = weight.weight(Operation.ALLOCATE, this, node, shard.index());
/*
* Remove the shard from the node again this is only a
* simulation
@ -660,7 +712,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* balance model. Iff this method returns a <code>true</code> the relocation has already been executed on the
* simulation model as well as on the cluster.
*/
private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx, float minCost) {
private boolean tryRelocateShard(Operation operation, ModelNode minNode, ModelNode maxNode, String idx, float minCost) {
final ModelIndex index = maxNode.getIndex(idx);
if (index != null) {
if (logger.isTraceEnabled()) {
@ -684,7 +736,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
Decision srcDecision;
if ((srcDecision = maxNode.removeShard(shard)) != null) {
minNode.addShard(shard, srcDecision);
final float delta = weight.weight(this, minNode, idx) - weight.weight(this, maxNode, idx);
final float delta = weight.weight(operation, this, minNode, idx) - weight.weight(operation, this, maxNode, idx);
if (delta < minCost) {
minCost = delta;
candidate = shard;
@ -920,7 +972,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
private float pivotWeight;
public NodeSorter(ModelNode[] modelNodes, WeightFunction function, Balancer balancer) {
this.function = function;
this.balancer = balancer;
this.modelNodes = modelNodes;
@ -931,16 +982,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
* Resets the sorter, recalculates the weights per node and sorts the
* nodes by weight, with minimal weight first.
*/
public void reset(String index) {
public void reset(Operation operation, String index) {
this.index = index;
for (int i = 0; i < weights.length; i++) {
weights[i] = weight(modelNodes[i]);
weights[i] = weight(operation, modelNodes[i]);
}
quickSort(0, modelNodes.length - 1);
}
public float weight(ModelNode node) {
return function.weight(balancer, node, index);
public float weight(Operation operation, ModelNode node) {
return function.weight(operation, balancer, node, index);
}
@Override

View File

@ -30,22 +30,33 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import java.util.List;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator;
import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators;
import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.gateway.none.NoneGatewayAllocator;
import org.elasticsearch.node.settings.NodeSettingsService;
import org.elasticsearch.node.settings.NodeSettingsService.Listener;
import org.hamcrest.Matchers;
import org.testng.annotations.Test;
@ -353,4 +364,163 @@ public class BalanceConfigurationTests {
assertThat(allocator.getThreshold(), Matchers.equalTo(3.0f));
}
@Test
public void testNoRebalanceOnPrimaryOverload() {
ImmutableSettings.Builder settings = settingsBuilder();
AllocationService strategy = new AllocationService(settings.build(), new AllocationDeciders(settings.build(),
new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)), new ShardsAllocators(settings.build(),
new NoneGatewayAllocator(), new ShardsAllocator() {
@Override
public boolean rebalance(RoutingAllocation allocation) {
return false;
}
@Override
public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
return false;
}
@Override
public void applyStartedShards(StartedRerouteAllocation allocation) {
}
@Override
public void applyFailedShards(FailedRerouteAllocation allocation) {
}
/*
* // this allocator tries to rebuild this scenario where a rebalance is
* // triggered solely by the primary overload on node [1] where a shard
* // is rebalanced to node 0
routing_nodes:
-----node_id[0][V]
--------[test][0], node[0], [R], s[STARTED]
--------[test][4], node[0], [R], s[STARTED]
-----node_id[1][V]
--------[test][0], node[1], [P], s[STARTED]
--------[test][1], node[1], [P], s[STARTED]
--------[test][3], node[1], [R], s[STARTED]
-----node_id[2][V]
--------[test][1], node[2], [R], s[STARTED]
--------[test][2], node[2], [R], s[STARTED]
--------[test][4], node[2], [P], s[STARTED]
-----node_id[3][V]
--------[test][2], node[3], [P], s[STARTED]
--------[test][3], node[3], [P], s[STARTED]
---- unassigned
*/
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
List<MutableShardRouting> unassigned = allocation.routingNodes().unassigned();
boolean changed = !unassigned.isEmpty();
for (MutableShardRouting sr : unassigned) {
switch (sr.id()) {
case 0:
if (sr.primary()) {
allocation.routingNodes().node("node1").add(sr);
} else {
allocation.routingNodes().node("node0").add(sr);
}
break;
case 1:
if (sr.primary()) {
allocation.routingNodes().node("node1").add(sr);
} else {
allocation.routingNodes().node("node2").add(sr);
}
break;
case 2:
if (sr.primary()) {
allocation.routingNodes().node("node3").add(sr);
} else {
allocation.routingNodes().node("node2").add(sr);
}
break;
case 3:
if (sr.primary()) {
allocation.routingNodes().node("node3").add(sr);
} else {
allocation.routingNodes().node("node1").add(sr);
}
break;
case 4:
if (sr.primary()) {
allocation.routingNodes().node("node2").add(sr);
} else {
allocation.routingNodes().node("node0").add(sr);
}
break;
}
}
unassigned.clear();
return changed;
}
}));
MetaData.Builder metaDataBuilder = newMetaDataBuilder();
RoutingTable.Builder routingTableBuilder = routingTable();
IndexMetaData.Builder indexMeta = newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1);
metaDataBuilder = metaDataBuilder.put(indexMeta);
MetaData metaData = metaDataBuilder.build();
for (IndexMetaData index : metaData.indices().values()) {
routingTableBuilder.addAsNew(index);
}
RoutingTable routingTable = routingTableBuilder.build();
DiscoveryNodes.Builder nodes = newNodesBuilder();
for (int i = 0; i < 4; i++) {
DiscoveryNode node = newNode("node"+i);
nodes.put(node);
}
ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
RoutingNodes routingNodes = clusterState.routingNodes();
for (RoutingNode routingNode : routingNodes) {
for (MutableShardRouting mutableShardRouting : routingNode) {
assertThat(mutableShardRouting.state(), Matchers.equalTo(ShardRoutingState.INITIALIZING));
}
}
strategy = new AllocationService(settings.build());
logger.info("use the new allocator and check if it moves shards");
routingNodes = clusterState.routingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (RoutingNode routingNode : routingNodes) {
for (MutableShardRouting mutableShardRouting : routingNode) {
assertThat(mutableShardRouting.state(), Matchers.equalTo(ShardRoutingState.STARTED));
}
}
logger.info("start the replica shards");
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (RoutingNode routingNode : routingNodes) {
for (MutableShardRouting mutableShardRouting : routingNode) {
assertThat(mutableShardRouting.state(), Matchers.equalTo(ShardRoutingState.STARTED));
}
}
logger.info("rebalancing");
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
for (RoutingNode routingNode : routingNodes) {
for (MutableShardRouting mutableShardRouting : routingNode) {
assertThat(mutableShardRouting.state(), Matchers.equalTo(ShardRoutingState.STARTED));
}
}
}
}