diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index 0d79392cdda..01a7361c5ab 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -24,7 +24,10 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.IntroSorter; import org.elasticsearch.ElasticSearchIllegalArgumentException; import org.elasticsearch.cluster.metadata.MetaData; -import org.elasticsearch.cluster.routing.*; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.RoutingNodes; +import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.allocation.FailedRerouteAllocation; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; import org.elasticsearch.cluster.routing.allocation.StartedRerouteAllocation; @@ -345,6 +348,13 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards return allocateUnassigned(unassigned, routing.ignoredUnassigned()); } + private static boolean lessThan(float delta, float threshold) { + /* deltas close to the threshold are "rounded" to the threshold manually + to prevent floating point problems if the delta is very close to the + threshold ie. 1.000000002 which can trigger unnecessary balance actions*/ + return delta <= threshold + 0.001f; + } + /** * Balances the nodes on the cluster model according to the weight * function. The configured threshold is the minimum delta between the @@ -384,8 +394,8 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards advance_range: if (maxNode.numShards(index) > 0) { float delta = weights[highIdx] - weights[lowIdx]; - delta = delta <= threshold ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode); - if (delta <= threshold) { + delta = lessThan(delta, threshold) ? delta : sorter.weight(Operation.THRESHOLD_CHECK, maxNode) - sorter.weight(Operation.THRESHOLD_CHECK, minNode); + if (lessThan(delta, threshold)) { if (lowIdx > 0 && highIdx-1 > 0 // is there a chance for a higher delta? && (weights[highIdx-1] - weights[0] > threshold) // check if we need to break at all ) { @@ -412,7 +422,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards maxNode.getNodeId(), weights[highIdx], minNode.getNodeId(), weights[lowIdx], delta); } /* pass the delta to the replication function to prevent relocations that only swap the weights of the two nodes. - * a relocation must bring us closer to the balance if we only achive the same delta the relocation is useless */ + * a relocation must bring us closer to the balance if we only achieve the same delta the relocation is useless */ if (tryRelocateShard(Operation.BALANCE, minNode, maxNode, index, delta)) { /* * TODO we could be a bit smarter here, we don't need to fully sort necessarily diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java index 4ac84932e98..55cb1e87926 100644 --- a/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/NodeVersionAllocationDeciderTests.java @@ -34,7 +34,9 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.test.ElasticsearchAllocationTestCase; import org.junit.Test; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import static org.elasticsearch.cluster.routing.ShardRoutingState.*; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; @@ -209,7 +211,7 @@ public class NodeVersionAllocationDeciderTests extends ElasticsearchAllocationTe nodesBuilder.put(node); } clusterState = ClusterState.builder(clusterState).nodes(nodesBuilder).build(); - clusterState = stabelize(clusterState, service); + clusterState = stabilize(clusterState, service); } } @@ -247,28 +249,28 @@ public class NodeVersionAllocationDeciderTests extends ElasticsearchAllocationTe .put(newNode("old0", getPreviousVersion())) .put(newNode("old1", getPreviousVersion())) .put(newNode("old2", getPreviousVersion()))).build(); - clusterState = stabelize(clusterState, service); + clusterState = stabilize(clusterState, service); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() .put(newNode("old0", getPreviousVersion())) .put(newNode("old1", getPreviousVersion())) .put(newNode("new0"))).build(); - clusterState = stabelize(clusterState, service); + clusterState = stabilize(clusterState, service); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() .put(newNode("node0", getPreviousVersion())) .put(newNode("new1")) .put(newNode("new0"))).build(); - clusterState = stabelize(clusterState, service); + clusterState = stabilize(clusterState, service); clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder() .put(newNode("new2")) .put(newNode("new1")) .put(newNode("new0"))).build(); - clusterState = stabelize(clusterState, service); + clusterState = stabilize(clusterState, service); routingTable = clusterState.routingTable(); for (int i = 0; i < routingTable.index("test").shards().size(); i++) { assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(3)); @@ -281,7 +283,7 @@ public class NodeVersionAllocationDeciderTests extends ElasticsearchAllocationTe } } - private ClusterState stabelize(ClusterState clusterState, AllocationService service) { + private ClusterState stabilize(ClusterState clusterState, AllocationService service) { logger.trace("RoutingNodes: {}", clusterState.routingNodes().prettyPrint()); RoutingTable routingTable = service.reroute(clusterState).routingTable(); @@ -289,30 +291,22 @@ public class NodeVersionAllocationDeciderTests extends ElasticsearchAllocationTe RoutingNodes routingNodes = clusterState.routingNodes(); assertRecoveryNodeVersions(routingNodes); - logger.info("start all the primary shards, replicas will start initializing"); - routingNodes = clusterState.routingNodes(); - routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); - assertRecoveryNodeVersions(routingNodes); - - logger.info("start the replica shards"); - routingNodes = clusterState.routingNodes(); - routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); - clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); - routingNodes = clusterState.routingNodes(); logger.info("complete rebalancing"); RoutingTable prev = routingTable; - while (true) { + boolean stable = false; + for (int i = 0; i < 1000; i++) { // at most 200 iters - this should be enough for all tests logger.trace("RoutingNodes: {}", clusterState.getRoutingNodes().prettyPrint()); routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); - if (routingTable == prev) + if (stable = (routingTable == prev)) { break; + } assertRecoveryNodeVersions(routingNodes); prev = routingTable; } + logger.info("stabilized success [{}]", stable); + assertThat(stable, is(true)); return clusterState; }