From c9c10273a66df858bca2e45b6b27051c969b78d6 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Thu, 2 May 2013 17:47:58 +0200 Subject: [PATCH] Introduced a Opertaion enum that is passed to each call of WeightFunction#weight to allow dedicated weight calculations per operation. In certain circumstance it is more efficient / required to ignore certain factors in the weight calculation to prevent for instance relocations if they are solely triggered by tie-breakers. In particular the primary balance property should not be taken into account if the delta for early termination is calculated since otherwise a relocation could be triggered solely by the fact that two nodes have different amount of primaries allocated to them. Closes #2984 --- .../allocator/BalancedShardsAllocator.java | 109 ++++++++--- .../allocation/BalanceConfigurationTests.java | 172 +++++++++++++++++- 2 files changed, 251 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index f59e44c359b..0e99e30b1a2 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -74,9 +74,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards class ApplySettings implements NodeSettingsService.Listener { @Override public void onRefreshSettings(Settings settings) { - float indexBalance = settings.getAsFloat(SETTING_INDEX_BALANCE_FACTOR, weightFunction.indexBalance); - float shardBalance = settings.getAsFloat(SETTING_SHARD_BALANCE_FACTOR, weightFunction.shardBalance); - float primaryBalance = settings.getAsFloat(SETTING_PRIMARY_BALANCE_FACTOR, weightFunction.primaryBalance); + final float indexBalance = settings.getAsFloat(SETTING_INDEX_BALANCE_FACTOR, weightFunction.indexBalance); + final float shardBalance = settings.getAsFloat(SETTING_SHARD_BALANCE_FACTOR, weightFunction.shardBalance); + final float primaryBalance = settings.getAsFloat(SETTING_PRIMARY_BALANCE_FACTOR, weightFunction.primaryBalance); float threshold = settings.getAsFloat(SETTING_THRESHOLD, BalancedShardsAllocator.this.threshold); if (threshold <= 0.0f) { throw new ElasticSearchIllegalArgumentException("threshold must be greater than 0.0f but was: " + threshold); @@ -87,7 +87,9 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } private volatile WeightFunction weightFunction = new WeightFunction(DEFAULT_INDEX_BALANCE_FACTOR, DEFAULT_SHARD_BALANCE_FACTOR, DEFAULT_PRIMARY_BALANCE_FACTOR); + private volatile float threshold = 1.0f; + public BalancedShardsAllocator(Settings settings) { this(settings, new NodeSettingsService(settings)); @@ -185,25 +187,72 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards private final float indexBalance; private final float shardBalance; private final float primaryBalance; + private final EnumMap thetaMap = new EnumMap(Operation.class); public WeightFunction(float indexBalance, float shardBalance, float primaryBalance) { - final float sum = indexBalance + shardBalance + primaryBalance; + float sum = indexBalance + shardBalance + primaryBalance; if (sum <= 0.0f) { throw new ElasticSearchIllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); } - this.indexBalance = indexBalance / sum; - this.shardBalance = shardBalance / sum; - this.primaryBalance = primaryBalance / sum; + final float[] defaultTheta = new float[] { shardBalance / sum, indexBalance / sum, primaryBalance / sum }; + for(Operation operation : Operation.values()) { + switch(operation) { + case THRESHOLD_CHECK: + sum = indexBalance + shardBalance; + if (sum <= 0.0f) { + thetaMap.put(operation, defaultTheta); + } + thetaMap.put(operation, new float[] { shardBalance / sum, indexBalance / sum, 0}); + break; + case BALANCE: + case ALLOCATE: + case MOVE: + thetaMap.put(operation, defaultTheta); + break; + default: + assert false; + } + } + this.indexBalance = indexBalance; + this.shardBalance = shardBalance; + this.primaryBalance = primaryBalance; } - public float weight(Balancer balancer, ModelNode node, String index) { - final float weightShard = shardBalance * (node.numShards() - balancer.avgShardsPerNode()); - final float weightIndex = indexBalance * (node.numShards(index) - balancer.avgShardsPerNode(index)); - final float weightPrimary = primaryBalance * (node.numPrimaries() - balancer.avgPrimariesPerNode()); - return weightShard + weightIndex + weightPrimary; + public float weight(Operation operation, Balancer balancer, ModelNode node, String index) { + final float weightShard = (node.numShards() - balancer.avgShardsPerNode()); + final float weightIndex = (node.numShards(index) - balancer.avgShardsPerNode(index)); + final float weightPrimary = (node.numPrimaries() - balancer.avgPrimariesPerNode()); + final float[] theta = thetaMap.get(operation); + assert theta != null; + return theta[0] * weightShard + theta[1] * weightIndex + theta[2] * weightPrimary; } } + + /** + * An enum that donates the actual operation the {@link WeightFunction} is + * applied to. + */ + public static enum Operation { + /** + * Provided during balance operations. + */ + BALANCE, + /** + * Provided during initial allocation operation for unassigned shards. + */ + ALLOCATE, + /** + * Provided during move operation. + */ + MOVE, + /** + * Provided when the weight delta is checked against the configured threshold. + * This can be used to ignore tie-breaking weight factors that should not + * solely trigger a relocation unless the delta is above the threshold. + */ + THRESHOLD_CHECK + } /** * A {@link Balancer} @@ -218,7 +267,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards private final float threshold; private final MetaData metaData; - + private final Predicate assignedFilter = new Predicate() { @Override public boolean apply(MutableShardRouting input) { @@ -226,6 +275,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } }; + public Balancer(ESLogger logger, RoutingAllocation allocation, WeightFunction weight, float threshold) { this.logger = logger; this.allocation = allocation; @@ -325,8 +375,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards boolean changed = initialize(allocation.routingNodes()); NodeSorter sorter = newNodeSorter(); if (nodes.size() > 1) { /* skip if we only have one node */ - for (String index : buildWeightOrderedIndidces(sorter)) { - sorter.reset(index); + for (String index : buildWeightOrderedIndidces(Operation.BALANCE, sorter)) { + sorter.reset(Operation.BALANCE,index); final float[] weights = sorter.weights; final ModelNode[] modelNodes = sorter.modelNodes; int lowIdx = 0; @@ -336,8 +386,10 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards final ModelNode maxNode = modelNodes[highIdx]; if (maxNode.numShards(index) > 0) { float delta = weights[highIdx] - weights[lowIdx]; + delta = delta <= threshold ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode); if (delta <= threshold) { if (logger.isTraceEnabled()) { + logger.trace("Stop balancing index [{}] min_node [{}] weight: [{}] max_node [{}] weight: [{}] delta: [{}]", index, maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); } @@ -349,14 +401,14 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards } /* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes. * a relocation must bring us closer to the balance if we only achive the same delta the relocation is useless */ - if (tryRelocateShard(minNode, maxNode, index, delta)) { + if (tryRelocateShard(Operation.BALANCE, minNode, maxNode, index, delta)) { /* * TODO we could be a bit smarter here, we don't need to fully sort necessarily * we could just find the place to insert linearly but the win might be minor * compared to the added complexity */ - weights[lowIdx] = sorter.weight(modelNodes[lowIdx]); - weights[highIdx] = sorter.weight(modelNodes[highIdx]); + weights[lowIdx] = sorter.weight(Operation.BALANCE, modelNodes[lowIdx]); + weights[highIdx] = sorter.weight(Operation.BALANCE, modelNodes[highIdx]); sorter.quickSort(0, weights.length - 1); lowIdx = 0; highIdx = weights.length - 1; @@ -397,11 +449,11 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * average. To re-balance we need to move shards back eventually likely * to the nodes we relocated them from. */ - private String[] buildWeightOrderedIndidces(NodeSorter sorter) { + private String[] buildWeightOrderedIndidces(Operation operation, NodeSorter sorter) { final String[] indices = this.indices.toArray(new String[this.indices.size()]); final float[] deltas = new float[indices.length]; for (int i = 0; i < deltas.length; i++) { - sorter.reset(indices[i]); + sorter.reset(operation, indices[i]); deltas[i] = sorter.delta(); } new SorterTemplate() { @@ -459,7 +511,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards final ModelNode sourceNode = nodes.get(node.nodeId()); assert sourceNode != null; final NodeSorter sorter = newNodeSorter(); - sorter.reset(shard.getIndex()); + sorter.reset(Operation.MOVE, shard.getIndex()); final ModelNode[] nodes = sorter.modelNodes; assert sourceNode.containsShard(shard); /* @@ -577,7 +629,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards */ if (!node.containsShard(shard)) { node.addShard(shard, Decision.ALWAYS); - float currentWeight = weight.weight(this, node, shard.index()); + float currentWeight = weight.weight(Operation.ALLOCATE, this, node, shard.index()); /* * Remove the shard from the node again this is only a * simulation @@ -660,7 +712,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * balance model. Iff this method returns a true the relocation has already been executed on the * simulation model as well as on the cluster. */ - private boolean tryRelocateShard(ModelNode minNode, ModelNode maxNode, String idx, float minCost) { + private boolean tryRelocateShard(Operation operation, ModelNode minNode, ModelNode maxNode, String idx, float minCost) { final ModelIndex index = maxNode.getIndex(idx); if (index != null) { if (logger.isTraceEnabled()) { @@ -684,7 +736,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards Decision srcDecision; if ((srcDecision = maxNode.removeShard(shard)) != null) { minNode.addShard(shard, srcDecision); - final float delta = weight.weight(this, minNode, idx) - weight.weight(this, maxNode, idx); + final float delta = weight.weight(operation, this, minNode, idx) - weight.weight(operation, this, maxNode, idx); if (delta < minCost) { minCost = delta; candidate = shard; @@ -920,7 +972,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards private float pivotWeight; public NodeSorter(ModelNode[] modelNodes, WeightFunction function, Balancer balancer) { - this.function = function; this.balancer = balancer; this.modelNodes = modelNodes; @@ -931,16 +982,16 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards * Resets the sorter, recalculates the weights per node and sorts the * nodes by weight, with minimal weight first. */ - public void reset(String index) { + public void reset(Operation operation, String index) { this.index = index; for (int i = 0; i < weights.length; i++) { - weights[i] = weight(modelNodes[i]); + weights[i] = weight(operation, modelNodes[i]); } quickSort(0, modelNodes.length - 1); } - public float weight(ModelNode node) { - return function.weight(balancer, node, index); + public float weight(Operation operation, ModelNode node) { + return function.weight(operation, balancer, node, index); } @Override diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/BalanceConfigurationTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/BalanceConfigurationTests.java index 87425ba16ad..fcdd2db509a 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/BalanceConfigurationTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/BalanceConfigurationTests.java @@ -30,22 +30,33 @@ import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilde import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode; import static org.hamcrest.MatcherAssert.assertThat; +import java.util.List; + import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.routing.MutableShardRouting; import org.elasticsearch.cluster.routing.RoutingNode; import org.elasticsearch.cluster.routing.RoutingNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.AllocationService; +import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; +import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocator; +import org.elasticsearch.cluster.routing.allocation.allocator.ShardsAllocators; +import org.elasticsearch.cluster.routing.allocation.decider.AllocationDeciders; import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.gateway.none.NoneGatewayAllocator; import org.elasticsearch.node.settings.NodeSettingsService; -import org.elasticsearch.node.settings.NodeSettingsService.Listener; import org.hamcrest.Matchers; import org.testng.annotations.Test; @@ -353,4 +364,163 @@ public class BalanceConfigurationTests { assertThat(allocator.getThreshold(), Matchers.equalTo(3.0f)); } + @Test + public void testNoRebalanceOnPrimaryOverload() { + + ImmutableSettings.Builder settings = settingsBuilder(); + AllocationService strategy = new AllocationService(settings.build(), new AllocationDeciders(settings.build(), + new NodeSettingsService(ImmutableSettings.Builder.EMPTY_SETTINGS)), new ShardsAllocators(settings.build(), + new NoneGatewayAllocator(), new ShardsAllocator() { + + @Override + public boolean rebalance(RoutingAllocation allocation) { + return false; + } + + @Override + public boolean move(MutableShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { + return false; + } + + @Override + public void applyStartedShards(StartedRerouteAllocation allocation) { + + + } + + @Override + public void applyFailedShards(FailedRerouteAllocation allocation) { + } + + /* + * // this allocator tries to rebuild this scenario where a rebalance is + * // triggered solely by the primary overload on node [1] where a shard + * // is rebalanced to node 0 + routing_nodes: + -----node_id[0][V] + --------[test][0], node[0], [R], s[STARTED] + --------[test][4], node[0], [R], s[STARTED] + -----node_id[1][V] + --------[test][0], node[1], [P], s[STARTED] + --------[test][1], node[1], [P], s[STARTED] + --------[test][3], node[1], [R], s[STARTED] + -----node_id[2][V] + --------[test][1], node[2], [R], s[STARTED] + --------[test][2], node[2], [R], s[STARTED] + --------[test][4], node[2], [P], s[STARTED] + -----node_id[3][V] + --------[test][2], node[3], [P], s[STARTED] + --------[test][3], node[3], [P], s[STARTED] + ---- unassigned + */ + @Override + public boolean allocateUnassigned(RoutingAllocation allocation) { + List unassigned = allocation.routingNodes().unassigned(); + boolean changed = !unassigned.isEmpty(); + for (MutableShardRouting sr : unassigned) { + switch (sr.id()) { + case 0: + if (sr.primary()) { + allocation.routingNodes().node("node1").add(sr); + } else { + allocation.routingNodes().node("node0").add(sr); + } + break; + case 1: + if (sr.primary()) { + allocation.routingNodes().node("node1").add(sr); + } else { + allocation.routingNodes().node("node2").add(sr); + } + break; + case 2: + if (sr.primary()) { + allocation.routingNodes().node("node3").add(sr); + } else { + allocation.routingNodes().node("node2").add(sr); + } + break; + case 3: + if (sr.primary()) { + allocation.routingNodes().node("node3").add(sr); + } else { + allocation.routingNodes().node("node1").add(sr); + } + break; + case 4: + if (sr.primary()) { + allocation.routingNodes().node("node2").add(sr); + } else { + allocation.routingNodes().node("node0").add(sr); + } + break; + } + + } + unassigned.clear(); + return changed; + } + })); + MetaData.Builder metaDataBuilder = newMetaDataBuilder(); + RoutingTable.Builder routingTableBuilder = routingTable(); + IndexMetaData.Builder indexMeta = newIndexMetaDataBuilder("test").numberOfShards(5).numberOfReplicas(1); + metaDataBuilder = metaDataBuilder.put(indexMeta); + MetaData metaData = metaDataBuilder.build(); + for (IndexMetaData index : metaData.indices().values()) { + routingTableBuilder.addAsNew(index); + } + RoutingTable routingTable = routingTableBuilder.build(); + DiscoveryNodes.Builder nodes = newNodesBuilder(); + for (int i = 0; i < 4; i++) { + DiscoveryNode node = newNode("node"+i); + nodes.put(node); + } + + ClusterState clusterState = newClusterStateBuilder().nodes(nodes).metaData(metaData).routingTable(routingTable).build(); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + RoutingNodes routingNodes = clusterState.routingNodes(); + + for (RoutingNode routingNode : routingNodes) { + for (MutableShardRouting mutableShardRouting : routingNode) { + assertThat(mutableShardRouting.state(), Matchers.equalTo(ShardRoutingState.INITIALIZING)); + } + } + strategy = new AllocationService(settings.build()); + + logger.info("use the new allocator and check if it moves shards"); + routingNodes = clusterState.routingNodes(); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + for (RoutingNode routingNode : routingNodes) { + for (MutableShardRouting mutableShardRouting : routingNode) { + assertThat(mutableShardRouting.state(), Matchers.equalTo(ShardRoutingState.STARTED)); + } + } + + logger.info("start the replica shards"); + routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + for (RoutingNode routingNode : routingNodes) { + for (MutableShardRouting mutableShardRouting : routingNode) { + assertThat(mutableShardRouting.state(), Matchers.equalTo(ShardRoutingState.STARTED)); + } + } + + logger.info("rebalancing"); + routingTable = strategy.reroute(clusterState).routingTable(); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(routingTable).build(); + routingNodes = clusterState.routingNodes(); + + for (RoutingNode routingNode : routingNodes) { + for (MutableShardRouting mutableShardRouting : routingNode) { + assertThat(mutableShardRouting.state(), Matchers.equalTo(ShardRoutingState.STARTED)); + } + } + + } + } \ No newline at end of file