From d8dee92f989197afb12cef8eb1edd74756c3dc95 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 18 Dec 2013 16:08:03 +0100 Subject: [PATCH] Make BalancedAllocationDecider assignments deterministic a previous change introduces an identity hashset that has non-deterministic iteration order which kill the reproducibility of our unittests if they fail. This patch adds back deterministic allocations. --- .../allocator/BalancedShardsAllocator.java | 109 ++++++++++-------- .../allocation/AddIncrementallyTests.java | 12 +- 2 files changed, 68 insertions(+), 53 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index c2bb966ced7..7bd7ea9bbaf 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -635,7 +635,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards int secondaryLength = 0; int primaryLength = primary.length; ArrayUtil.timSort(primary, comparator); - final Set values = new IdentityHashSet(nodes.values()); + final Set throttledNodes = new IdentityHashSet(); do { for (int i = 0; i < primaryLength; i++) { MutableShardRouting shard = primary[i]; @@ -658,60 +658,67 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards float minWeight = Float.POSITIVE_INFINITY; ModelNode minNode = null; Decision decision = null; - for (ModelNode node : values) { - /* - * The shard we add is removed below to simulate the - * addition for weight calculation we use Decision.ALWAYS to - * not violate the not null condition. - */ - if (!node.containsShard(shard)) { - node.addShard(shard, Decision.ALWAYS); - float currentWeight = weight.weight(Operation.ALLOCATE, this, node, shard.index()); + if (throttledNodes.size() < nodes.size()) { + /* Don't iterate over an identity hashset here the + * iteration order is different for each run and makes testing hard */ + for (ModelNode node : nodes.values()) { + if (throttledNodes.contains(node)) { + continue; + } /* - * Remove the shard from the node again this is only a - * simulation - */ - Decision removed = node.removeShard(shard); - assert removed != null; - /* - * Unless the operation is not providing any gains we - * don't check deciders - */ - if (currentWeight <= minWeight) { - Decision currentDecision = deciders.canAllocate(shard, routingNodes.node(node.getNodeId()), allocation); - NOUPDATE: - if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { - if (currentWeight == minWeight) { - /* we have an equal weight tie breaking: - * 1. if one decision is YES prefer it - * 2. prefer the node that holds the primary for this index with the next id in the ring ie. - * for the 3 shards 2 replica case we try to build up: - * 1 2 0 - * 2 0 1 - * 0 1 2 - * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater - * than the id of the shard we need to assign. This works find when new indices are created since - * primaries are added first and we only add one shard set a time in this algorithm. - */ - if (currentDecision.type() == decision.type()) { - final int repId = shard.id(); - final int nodeHigh = node.highestPrimary(shard.index()); - final int minNodeHigh = minNode.highestPrimary(shard.index()); - if ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) && (nodeHigh < minNodeHigh)) - || (nodeHigh > minNodeHigh && nodeHigh > repId && minNodeHigh < repId)) { - minNode = node; - minWeight = currentWeight; - decision = currentDecision; - } else { + * The shard we add is removed below to simulate the + * addition for weight calculation we use Decision.ALWAYS to + * not violate the not null condition. + */ + if (!node.containsShard(shard)) { + node.addShard(shard, Decision.ALWAYS); + float currentWeight = weight.weight(Operation.ALLOCATE, this, node, shard.index()); + /* + * Remove the shard from the node again this is only a + * simulation + */ + Decision removed = node.removeShard(shard); + assert removed != null; + /* + * Unless the operation is not providing any gains we + * don't check deciders + */ + if (currentWeight <= minWeight) { + Decision currentDecision = deciders.canAllocate(shard, routingNodes.node(node.getNodeId()), allocation); + NOUPDATE: + if (currentDecision.type() == Type.YES || currentDecision.type() == Type.THROTTLE) { + if (currentWeight == minWeight) { + /* we have an equal weight tie breaking: + * 1. if one decision is YES prefer it + * 2. prefer the node that holds the primary for this index with the next id in the ring ie. + * for the 3 shards 2 replica case we try to build up: + * 1 2 0 + * 2 0 1 + * 0 1 2 + * such that if we need to tie-break we try to prefer the node holding a shard with the minimal id greater + * than the id of the shard we need to assign. This works find when new indices are created since + * primaries are added first and we only add one shard set a time in this algorithm. + */ + if (currentDecision.type() == decision.type()) { + final int repId = shard.id(); + final int nodeHigh = node.highestPrimary(shard.index()); + final int minNodeHigh = minNode.highestPrimary(shard.index()); + if ((((nodeHigh > repId && minNodeHigh > repId) || (nodeHigh < repId && minNodeHigh < repId)) && (nodeHigh < minNodeHigh)) + || (nodeHigh > minNodeHigh && nodeHigh > repId && minNodeHigh < repId)) { + minNode = node; + minWeight = currentWeight; + decision = currentDecision; + } else { + break NOUPDATE; + } + } else if (currentDecision.type() != Type.YES) { break NOUPDATE; } - } else if (currentDecision.type() != Type.YES) { - break NOUPDATE; } + minNode = node; + minWeight = currentWeight; + decision = currentDecision; } - minNode = node; - minWeight = currentWeight; - decision = currentDecision; } } } @@ -732,7 +739,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards if (logger.isTraceEnabled()) { logger.trace("Can not allocate on node [{}] remove from round decisin [{}]", node, decision.type()); } - values.remove(minNode); + throttledNodes.add(minNode); } } if (logger.isTraceEnabled()) { diff --git a/src/test/java/org/elasticsearch/cluster/routing/allocation/AddIncrementallyTests.java b/src/test/java/org/elasticsearch/cluster/routing/allocation/AddIncrementallyTests.java index 3e9bf274fff..2bed1164312 100644 --- a/src/test/java/org/elasticsearch/cluster/routing/allocation/AddIncrementallyTests.java +++ b/src/test/java/org/elasticsearch/cluster/routing/allocation/AddIncrementallyTests.java @@ -1,6 +1,7 @@ package org.elasticsearch.cluster.routing.allocation; import com.carrotsearch.hppc.cursors.ObjectCursor; +import com.google.common.collect.Lists; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; @@ -18,6 +19,9 @@ import org.hamcrest.Matcher; import org.hamcrest.Matchers; import org.junit.Test; +import java.util.ArrayList; +import java.util.Collections; + import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING; import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED; import static org.elasticsearch.cluster.routing.allocation.RoutingAllocationTests.newNode; @@ -308,6 +312,7 @@ public class AddIncrementallyTests extends ElasticsearchAllocationTestCase { logger.info("complete rebalancing"); RoutingTable prev = routingTable; while (true) { + logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint()); routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -352,6 +357,7 @@ public class AddIncrementallyTests extends ElasticsearchAllocationTestCase { logger.info("complete rebalancing"); RoutingTable prev = routingTable; while (true) { + logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint()); routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes(); @@ -366,8 +372,9 @@ public class AddIncrementallyTests extends ElasticsearchAllocationTestCase { private ClusterState removeNodes(ClusterState clusterState, AllocationService service, int numNodes) { logger.info("Removing [{}] nodes", numNodes); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(clusterState.nodes()); - - for (DiscoveryNode node : clusterState.nodes()) { + ArrayList discoveryNodes = Lists.newArrayList(clusterState.nodes()); + Collections.shuffle(discoveryNodes, getRandom()); + for (DiscoveryNode node : discoveryNodes) { nodes.remove(node.id()); numNodes--; if (numNodes <= 0) { @@ -396,6 +403,7 @@ public class AddIncrementallyTests extends ElasticsearchAllocationTestCase { logger.info("complete rebalancing"); RoutingTable prev = routingTable; while (true) { + logger.debug("ClusterState: {}", clusterState.getRoutingNodes().prettyPrint()); routingTable = service.applyStartedShards(clusterState, routingNodes.shardsWithState(INITIALIZING)).routingTable(); clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build(); routingNodes = clusterState.routingNodes();