Make BalancedAllocationDecider assignments deterministic

a previous change introduces an identity hashset that has non-deterministic
iteration order which kill the reproducibility of our unittests if they fail.
This patch adds back deterministic allocations.
This commit is contained in:
Simon Willnauer 2013-12-18 16:08:03 +01:00
parent 52db8eb324
commit d8dee92f98
2 changed files with 68 additions and 53 deletions

View File

@ -635,7 +635,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
int secondaryLength = 0;
int primaryLength = primary.length;
ArrayUtil.timSort(primary, comparator);
final Set<ModelNode> values = new IdentityHashSet<ModelNode>(nodes.values());
final Set<ModelNode> throttledNodes = new IdentityHashSet<ModelNode>();
do {
for (int i = 0; i < primaryLength; i++) {
MutableShardRouting shard = primary[i];
@ -658,60 +658,67 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
float minWeight = Float.POSITIVE_INFINITY;
ModelNode minNode = null;
Decision decision = null;
for (ModelNode node : values) {
/*
* The shard we add is removed below to simulate the
* addition for weight calculation we use Decision.ALWAYS to
* not violate the not null condition.
*/
if (!node.containsShard(shard)) {
node.addShard(shard, Decision.ALWAYS);
float currentWeight = weight.weight(Operation.ALLOCATE, this, node, shard.index());
if (throttledNodes.size() < nodes.size()) {
/* Don't iterate over an identity hashset here the
* iteration order is different for each run and makes testing hard */
for (ModelNode node : nodes.values()) {
if (throttledNodes.contains(node)) {
continue;
}
/*
* Remove the shard from the node again this is only a
* simulation
*/
Decision removed = node.removeShard(shard);
assert removed != null;
/*
* Unless the operation is not providing any gains we
* don't check deciders
*/
if (currentWeight <= minWeight) {
Decision currentDecision = deciders.canAllocate(shard, routingNodes.node(node.getNodeId()), allocation);
NOUPDATE:
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
if (currentWeight == minWeight) {
/* we have an equal weight tie breaking:
* 1. if one decision is YES prefer it
* 2. prefer the node that holds the primary for this index with the next id in the ring ie.
* for the 3 shards 2 replica case we try to build up:
* 1 2 0
* 2 0 1
* 0 1 2
* such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater
* than the id of the shard we need to assign. This works find when new indices are created since
* primaries are added first and we only add one shard set a time in this algorithm.
*/
if (currentDecision.type() == decision.type()) {
final int repId = shard.id();
final int nodeHigh = node.highestPrimary(shard.index());
final int minNodeHigh = minNode.highestPrimary(shard.index());
if ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) && (nodeHigh < minNodeHigh))
|| (nodeHigh > minNodeHigh && nodeHigh > repId && minNodeHigh < repId)) {
minNode = node;
minWeight = currentWeight;
decision = currentDecision;
} else {
* The shard we add is removed below to simulate the
* addition for weight calculation we use Decision.ALWAYS to
* not violate the not null condition.
*/
if (!node.containsShard(shard)) {
node.addShard(shard, Decision.ALWAYS);
float currentWeight = weight.weight(Operation.ALLOCATE, this, node, shard.index());
/*
* Remove the shard from the node again this is only a
* simulation
*/
Decision removed = node.removeShard(shard);
assert removed != null;
/*
* Unless the operation is not providing any gains we
* don't check deciders
*/
if (currentWeight <= minWeight) {
Decision currentDecision = deciders.canAllocate(shard, routingNodes.node(node.getNodeId()), allocation);
NOUPDATE:
if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) {
if (currentWeight == minWeight) {
/* we have an equal weight tie breaking:
* 1. if one decision is YES prefer it
* 2. prefer the node that holds the primary for this index with the next id in the ring ie.
* for the 3 shards 2 replica case we try to build up:
* 1 2 0
* 2 0 1
* 0 1 2
* such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater
* than the id of the shard we need to assign. This works find when new indices are created since
* primaries are added first and we only add one shard set a time in this algorithm.
*/
if (currentDecision.type() == decision.type()) {
final int repId = shard.id();
final int nodeHigh = node.highestPrimary(shard.index());
final int minNodeHigh = minNode.highestPrimary(shard.index());
if ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) && (nodeHigh < minNodeHigh))
|| (nodeHigh > minNodeHigh && nodeHigh > repId && minNodeHigh < repId)) {
minNode = node;
minWeight = currentWeight;
decision = currentDecision;
} else {
break NOUPDATE;
}
} else if (currentDecision.type() != Type.YES) {
break NOUPDATE;
}
} else if (currentDecision.type() != Type.YES) {
break NOUPDATE;
}
minNode = node;
minWeight = currentWeight;
decision = currentDecision;
}
minNode = node;
minWeight = currentWeight;
decision = currentDecision;
}
}
}
@ -732,7 +739,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (logger.isTraceEnabled()) {
logger.trace("Can not allocate on node [{}] remove from round decisin [{}]", node, decision.type());
}
values.remove(minNode);
throttledNodes.add(minNode);
}
}
if (logger.isTraceEnabled()) {

View File

@ -1,6 +1,7 @@
package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.cursors.ObjectCursor;
import com.google.common.collect.Lists;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
@ -18,6 +19,9 @@ import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Collections;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.newNode;
@ -308,6 +312,7 @@ public class AddIncrementallyTests extends ElasticsearchAllocationTestCase {
logger.info("complete rebalancing");
RoutingTable prev = routingTable;
while (true) {
logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint());
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -352,6 +357,7 @@ public class AddIncrementallyTests extends ElasticsearchAllocationTestCase {
logger.info("complete rebalancing");
RoutingTable prev = routingTable;
while (true) {
logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint());
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();
@ -366,8 +372,9 @@ public class AddIncrementallyTests extends ElasticsearchAllocationTestCase {
private ClusterState removeNodes(ClusterState clusterState, AllocationService service, int numNodes) {
logger.info("Removing [{}] nodes", numNodes);
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes());
for (DiscoveryNode node : clusterState.nodes()) {
ArrayList<DiscoveryNode> discoveryNodes = Lists.newArrayList(clusterState.nodes());
Collections.shuffle(discoveryNodes, getRandom());
for (DiscoveryNode node : discoveryNodes) {
nodes.remove(node.id());
numNodes--;
if (numNodes <= 0) {
@ -396,6 +403,7 @@ public class AddIncrementallyTests extends ElasticsearchAllocationTestCase {
logger.info("complete rebalancing");
RoutingTable prev = routingTable;
while (true) {
logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint());
routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
routingNodes = clusterState.routingNodes();