Use a tolerance to decide if a value is less than the threshold

Adding a small value to the threshold prevents weight deltas that are
very very close to the threshold to not trigger relocations. These
deltas can be rounding errors that lead to unnecessary relocations. In
practice this might only happen under very rare circumstances.
In general it's a good idea for the shard allocator to be a bit
more conversavtive in terms of rebalancing since in general relocation
costs are pretty high.

Closes #4630
This commit is contained in:
Simon Willnauer 2014-01-06 12:35:40 +01:00
parent 3240cc8145
commit 80de40f195
2 changed files with 29 additions and 25 deletions

View File

@ -24,7 +24,10 @@ import org.apache.lucene.util.ArrayUtil;
import org.apache.lucene.util.IntroSorter;
import org.elasticsearch.ElasticSearchIllegalArgumentException;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation;
@ -345,6 +348,13 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
return allocateUnassigned(unassigned, routing.ignoredUnassigned());
}
private static boolean lessThan(float delta, float threshold) {
/* deltas close to the threshold are "rounded" to the threshold manually
to prevent floating point problems if the delta is very close to the
threshold ie. 1.000000002 which can trigger unnecessary balance actions*/
return delta <= threshold + 0.001f;
}
/**
* Balances the nodes on the cluster model according to the weight
* function. The configured threshold is the minimum delta between the
@ -384,8 +394,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
advance_range:
if (maxNode.numShards(index) > 0) {
float delta = weights[highIdx] - weights[lowIdx];
delta = delta <= threshold ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode);
if (delta <= threshold) {
delta = lessThan(delta, threshold) ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode);
if (lessThan(delta, threshold)) {
if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta?
&& (weights[highIdx-1] - weights[0] > threshold) // check if we need to break at all
) {
@ -412,7 +422,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta);
}
/* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes.
* a relocation must bring us closer to the balance if we only achive the same delta the relocation is useless */
* a relocation must bring us closer to the balance if we only achieve the same delta the relocation is useless */
if (tryRelocateShard(Operation.BALANCE, minNode, maxNode, index, delta)) {
/*
* TODO we could be a bit smarter here, we don't need to fully sort necessarily

View File

@ -34,7 +34,9 @@ import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.test.ElasticsearchAllocationTestCase;
import org.junit.Test;
import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
@ -209,7 +211,7 @@ public class NodeVersionAllocationDeciderTests extends ElasticsearchAllocationTe
nodesBuilder.put(node);
}
clusterState = ClusterState.builder(clusterState).nodes(nodesBuilder).build();
clusterState = stabelize(clusterState, service);
clusterState = stabilize(clusterState, service);
}
}
@ -247,28 +249,28 @@ public class NodeVersionAllocationDeciderTests extends ElasticsearchAllocationTe
.put(newNode("old0", getPreviousVersion()))
.put(newNode("old1", getPreviousVersion()))
.put(newNode("old2", getPreviousVersion()))).build();
clusterState = stabelize(clusterState, service);
clusterState = stabilize(clusterState, service);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("old0", getPreviousVersion()))
.put(newNode("old1", getPreviousVersion()))
.put(newNode("new0"))).build();
clusterState = stabelize(clusterState, service);
clusterState = stabilize(clusterState, service);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("node0", getPreviousVersion()))
.put(newNode("new1"))
.put(newNode("new0"))).build();
clusterState = stabelize(clusterState, service);
clusterState = stabilize(clusterState, service);
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder()
.put(newNode("new2"))
.put(newNode("new1"))
.put(newNode("new0"))).build();
clusterState = stabelize(clusterState, service);
clusterState = stabilize(clusterState, service);
routingTable = clusterState.routingTable();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3));
@ -281,7 +283,7 @@ public class NodeVersionAllocationDeciderTests extends ElasticsearchAllocationTe
}
}
private ClusterState stabelize(ClusterState clusterState, AllocationService service) {
private ClusterState stabilize(ClusterState clusterState, AllocationService service) {
logger.trace("RoutingNodes: {}", clusterState.routingNodes().prettyPrint());
RoutingTable routingTable = service.reroute(clusterState).routingTable();
@ -289,30 +291,22 @@ public class NodeVersionAllocationDeciderTests extends ElasticsearchAllocationTe
RoutingNodes routingNodes = clusterState.routingNodes();
assertRecoveryNodeVersions(routingNodes);
logger.info("start all the primary shards, replicas will start initializing");
routingNodes = clusterState.routingNodes();
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
assertRecoveryNodeVersions(routingNodes);
logger.info("start the replica shards");
routingNodes = clusterState.routingNodes();
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
logger.info("complete rebalancing");
RoutingTable prev = routingTable;
while (true) {
boolean stable = false;
for (int i = 0; i < 1000; i++) { // at most 200 iters - this should be enough for all tests
logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes().prettyPrint());
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
if (routingTable == prev)
if (stable = (routingTable == prev)) {
break;
}
assertRecoveryNodeVersions(routingNodes);
prev = routingTable;
}
logger.info("stabilized success [{}]", stable);
assertThat(stable, is(true));
return clusterState;
}