Take ingored unallocated shards into account when makeing allocation decision

ClusterRebalanceAllocationDecider did not take unassigned shards into account
that are temporarily marked as ingored. This can cause unexpected behavior
when gateway allocator is still fetching shards or has marked shareds as ignored
since their quorum is not met yet.

Closes #14670
Closes #14678
This commit is contained in:
Simon Willnauer 2015-11-11 11:49:44 +01:00 committed by Boaz Leskes
parent fb10e59578
commit 044b3d59cd
10 changed files with 202 additions and 91 deletions

View File

@ -167,7 +167,7 @@ public class ShardStateAction extends AbstractComponent {
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
if (oldState != newState && newState.getRoutingNodes().hasUnassigned()) {
if (oldState != newState && newState.getRoutingNodes().unassigned().size() > 0) {
logger.trace("unassigned shards after shard failures. scheduling a reroute.");
routingService.reroute("unassigned shards after shard failures, scheduling a reroute");
}

View File

@ -183,13 +183,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return this.customs;
}
public <T extends ClusterState.Custom> T custom(String type) {
return (T) customs.get(type);
}
public boolean hasUnassigned() {
return !unassignedShards.isEmpty();
}
public <T extends ClusterState.Custom> T custom(String type) { return (T) customs.get(type); }
public UnassignedShards unassigned() {
return this.unassignedShards;
@ -217,12 +211,22 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return nodesPerAttributesCounts;
}
/**
* Returns <code>true</code> iff this {@link RoutingNodes} instance has any unassigned primaries even if the
* primaries are marked as temporarily ignored.
*/
public boolean hasUnassignedPrimaries() {
return unassignedShards.numPrimaries() > 0;
return unassignedShards.getNumPrimaries() + unassignedShards.getNumIgnoredPrimaries() > 0;
}
/**
* Returns <code>true</code> iff this {@link RoutingNodes} instance has any unassigned shards even if the
* shards are marked as temporarily ignored.
* @see UnassignedShards#isEmpty()
* @see UnassignedShards#isIgnoredEmpty()
*/
public boolean hasUnassignedShards() {
return !unassignedShards.isEmpty();
return unassignedShards.isEmpty() == false || unassignedShards.isIgnoredEmpty() == false;
}
public boolean hasInactivePrimaries() {
@ -524,25 +528,12 @@ public class RoutingNodes implements Iterable<RoutingNode> {
private final List<ShardRouting> ignored;
private int primaries = 0;
private long transactionId = 0;
private final UnassignedShards source;
private final long sourceTransactionId;
public UnassignedShards(UnassignedShards other) {
this.nodes = other.nodes;
source = other;
sourceTransactionId = other.transactionId;
unassigned = new ArrayList<>(other.unassigned);
ignored = new ArrayList<>(other.ignored);
primaries = other.primaries;
}
private int ignoredPrimaries = 0;
public UnassignedShards(RoutingNodes nodes) {
this.nodes = nodes;
unassigned = new ArrayList<>();
ignored = new ArrayList<>();
source = null;
sourceTransactionId = -1;
}
public void add(ShardRouting shardRouting) {
@ -550,21 +541,34 @@ public class RoutingNodes implements Iterable<RoutingNode> {
primaries++;
}
unassigned.add(shardRouting);
transactionId++;
}
public void sort(Comparator<ShardRouting> comparator) {
CollectionUtil.timSort(unassigned, comparator);
}
public int size() {
return unassigned.size();
}
/**
* Returns the size of the non-ignored unassigned shards
*/
public int size() { return unassigned.size(); }
public int numPrimaries() {
/**
* Returns the size of the temporarily marked as ignored unassigned shards
*/
public int ignoredSize() { return ignored.size(); }
/**
* Returns the number of non-ignored unassigned primaries
*/
public int getNumPrimaries() {
return primaries;
}
/**
* Returns the number of temporarily marked as ignored unassigned primaries
*/
public int getNumIgnoredPrimaries() { return ignoredPrimaries; }
@Override
public UnassignedIterator iterator() {
return new UnassignedIterator();
@ -580,12 +584,18 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
/**
* Adds a shard to the ignore unassigned list. Should be used with caution, typically,
* Marks a shard as temporarily ignored and adds it to the ignore unassigned list.
* Should be used with caution, typically,
* the correct usage is to removeAndIgnore from the iterator.
* @see #ignored()
* @see UnassignedIterator#removeAndIgnore()
* @see #isIgnoredEmpty()
*/
public void ignoreShard(ShardRouting shard) {
if (shard.primary()) {
ignoredPrimaries++;
}
ignored.add(shard);
transactionId++;
}
public class UnassignedIterator implements Iterator<ShardRouting> {
@ -618,6 +628,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
/**
* Removes and ignores the unassigned shard (will be ignored for this run, but
* will be added back to unassigned once the metadata is constructed again).
* Typically this is used when an allocation decision prevents a shard from being allocated such
* that subsequent consumers of this API won't try to allocate this shard again.
*/
public void removeAndIgnore() {
innerRemove();
@ -639,45 +651,37 @@ public class RoutingNodes implements Iterable<RoutingNode> {
if (current.primary()) {
primaries--;
}
transactionId++;
}
}
/**
* Returns <code>true</code> iff this collection contains one or more non-ignored unassigned shards.
*/
public boolean isEmpty() {
return unassigned.isEmpty();
}
/**
* Returns <code>true</code> iff any unassigned shards are marked as temporarily ignored.
* @see UnassignedShards#ignoreShard(ShardRouting)
* @see UnassignedIterator#removeAndIgnore()
*/
public boolean isIgnoredEmpty() {
return ignored.isEmpty();
}
public void shuffle() {
Collections.shuffle(unassigned);
}
public void clear() {
transactionId++;
unassigned.clear();
ignored.clear();
primaries = 0;
}
public void transactionEnd(UnassignedShards shards) {
assert shards.source == this && shards.sourceTransactionId == transactionId :
"Expected ID: " + shards.sourceTransactionId + " actual: " + transactionId + " Expected Source: " + shards.source + " actual: " + this;
transactionId++;
this.unassigned.clear();
this.unassigned.addAll(shards.unassigned);
this.ignored.clear();
this.ignored.addAll(shards.ignored);
this.primaries = shards.primaries;
}
public UnassignedShards transactionBegin() {
return new UnassignedShards(this);
}
/**
* Drains all unassigned shards and returns it.
* This method will not drain ignored shards.
*/
public ShardRouting[] drain() {
ShardRouting[] mutableShardRoutings = unassigned.toArray(new ShardRouting[unassigned.size()]);
unassigned.clear();
primaries = 0;
transactionId++;
return mutableShardRoutings;
}
}
@ -698,10 +702,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return true;
}
int unassignedPrimaryCount = 0;
int unassignedIgnoredPrimaryCount = 0;
int inactivePrimaryCount = 0;
int inactiveShardCount = 0;
int relocating = 0;
final Set<ShardId> seenShards = new HashSet<>();
Map<String, Integer> indicesAndShards = new HashMap<>();
for (RoutingNode node : routingNodes) {
for (ShardRouting shard : node) {
@ -716,7 +720,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
if (shard.relocating()) {
relocating++;
}
seenShards.add(shard.shardId());
Integer i = indicesAndShards.get(shard.index());
if (i == null) {
i = shard.id();
@ -751,11 +754,18 @@ public class RoutingNodes implements Iterable<RoutingNode> {
if (shard.primary()) {
unassignedPrimaryCount++;
}
seenShards.add(shard.shardId());
}
assert unassignedPrimaryCount == routingNodes.unassignedShards.numPrimaries() :
"Unassigned primaries is [" + unassignedPrimaryCount + "] but RoutingNodes returned unassigned primaries [" + routingNodes.unassigned().numPrimaries() + "]";
for (ShardRouting shard : routingNodes.unassigned().ignored()) {
if (shard.primary()) {
unassignedIgnoredPrimaryCount++;
}
}
assert unassignedPrimaryCount == routingNodes.unassignedShards.getNumPrimaries() :
"Unassigned primaries is [" + unassignedPrimaryCount + "] but RoutingNodes returned unassigned primaries [" + routingNodes.unassigned().getNumPrimaries() + "]";
assert unassignedIgnoredPrimaryCount == routingNodes.unassignedShards.getNumIgnoredPrimaries() :
"Unassigned ignored primaries is [" + unassignedIgnoredPrimaryCount + "] but RoutingNodes returned unassigned ignored primaries [" + routingNodes.unassigned().getNumIgnoredPrimaries() + "]";
assert inactivePrimaryCount == routingNodes.inactivePrimaryCount :
"Inactive Primary count [" + inactivePrimaryCount + "] but RoutingNodes returned inactive primaries [" + routingNodes.inactivePrimaryCount + "]";
assert inactiveShardCount == routingNodes.inactiveShardCount :

View File

@ -176,7 +176,7 @@ public class AllocationService extends AbstractComponent {
changed |= electPrimariesAndUnassignedDanglingReplicas(allocation);
// now allocate all the unassigned to available nodes
if (allocation.routingNodes().hasUnassigned()) {
if (allocation.routingNodes().unassigned().size() > 0) {
changed |= shardsAllocators.allocateUnassigned(allocation);
}
@ -232,7 +232,7 @@ public class AllocationService extends AbstractComponent {
private boolean electPrimariesAndUnassignedDanglingReplicas(RoutingAllocation allocation) {
boolean changed = false;
RoutingNodes routingNodes = allocation.routingNodes();
if (!routingNodes.hasUnassignedPrimaries()) {
if (routingNodes.unassigned().getNumPrimaries() == 0) {
// move out if we don't have unassigned primaries
return changed;
}

View File

@ -353,7 +353,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
logger.trace("Start assigning unassigned shards");
}
}
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned().transactionBegin();
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
boolean changed = initialize(routingNodes, unassigned);
if (onlyAssign == false && changed == false && allocation.deciders().canRebalance(allocation).type() == Type.YES) {
NodeSorter sorter = newNodeSorter();
@ -433,7 +433,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
}
}
routingNodes.unassigned().transactionEnd(unassigned);
return changed;
}
@ -508,7 +507,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (logger.isTraceEnabled()) {
logger.trace("Try moving shard [{}] from [{}]", shard, node);
}
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned().transactionBegin();
final RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned();
boolean changed = initialize(routingNodes, unassigned);
if (!changed) {
final ModelNode sourceNode = nodes.get(node.nodeId());
@ -544,7 +543,6 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
}
}
}
routingNodes.unassigned().transactionEnd(unassigned);
return changed;
}

View File

@ -51,15 +51,12 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
public static final String NAME = "cluster_rebalance";
public static final String CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE = "cluster.routing.allocation.allow_rebalance";
public static final Validator ALLOCATION_ALLOW_REBALANCE_VALIDATOR = new Validator() {
@Override
public String validate(String setting, String value, ClusterState clusterState) {
try {
ClusterRebalanceType.parseString(value);
return null;
} catch (IllegalArgumentException e) {
return "the value of " + setting + " must be one of: [always, indices_primaries_active, indices_all_active]";
}
public static final Validator ALLOCATION_ALLOW_REBALANCE_VALIDATOR = (setting, value, clusterState) -> {
try {
ClusterRebalanceType.parseString(value);
return null;
} catch (IllegalArgumentException e) {
return "the value of " + setting + " must be one of: [always, indices_primaries_active, indices_all_active]";
}
};
@ -153,7 +150,7 @@ public class ClusterRebalanceAllocationDecider extends AllocationDecider {
}
if (type == ClusterRebalanceType.INDICES_ALL_ACTIVE) {
// check if there are unassigned shards.
if ( allocation.routingNodes().hasUnassignedShards() ) {
if (allocation.routingNodes().hasUnassignedShards() ) {
return allocation.decision(Decision.NO, NAME, "cluster has unassigned shards");
}
// in case all indices are assigned, are there initializing shards which

View File

@ -73,7 +73,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().hasUnassigned(), equalTo(true));
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
}
});
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
@ -119,7 +119,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().hasUnassigned(), equalTo(true));
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
}
});
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));
@ -145,7 +145,7 @@ public class DelayedAllocationIT extends ESIntegTestCase {
assertBusy(new Runnable() {
@Override
public void run() {
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().hasUnassigned(), equalTo(true));
assertThat(client().admin().cluster().prepareState().all().get().getState().getRoutingNodes().unassigned().size() > 0, equalTo(true));
}
});
assertThat(client().admin().cluster().prepareHealth().get().getDelayedUnassignedShards(), equalTo(1));

View File

@ -91,7 +91,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.getRoutingNodes().hasUnassigned(), equalTo(false));
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
@ -120,7 +120,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertFalse("no shards should be unassigned", clusterState.getRoutingNodes().hasUnassigned());
assertFalse("no shards should be unassigned", clusterState.getRoutingNodes().unassigned().size() > 0);
String nodeId = null;
final List<ShardRouting> allShards = clusterState.getRoutingNodes().routingTable().allShards("test");
// we need to find the node with the replica otherwise we will not reroute
@ -265,7 +265,7 @@ public class RoutingServiceTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.getRoutingNodes().hasUnassigned(), equalTo(false));
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
ClusterState prevState = clusterState;
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();

View File

@ -213,12 +213,12 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.getRoutingNodes().hasUnassigned(), equalTo(false));
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
// verify that NODE_LEAVE is the reason for meta
assertThat(clusterState.getRoutingNodes().hasUnassigned(), equalTo(true));
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(true));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo(), notNullValue());
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.NODE_LEFT));
@ -242,12 +242,12 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.getRoutingNodes().hasUnassigned(), equalTo(false));
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// fail shard
ShardRouting shardToFail = clusterState.getRoutingNodes().shardsWithState(STARTED).get(0);
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyFailedShards(clusterState, Collections.singletonList(new FailedRerouteAllocation.FailedShard(shardToFail, "test fail", null)))).build();
// verify the reason and details
assertThat(clusterState.getRoutingNodes().hasUnassigned(), equalTo(true));
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(true));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo(), notNullValue());
assertThat(clusterState.getRoutingNodes().shardsWithState(UNASSIGNED).get(0).unassignedInfo().getReason(), equalTo(UnassignedInfo.Reason.ALLOCATION_FAILED));
@ -305,7 +305,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.getRoutingNodes().hasUnassigned(), equalTo(false));
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();
@ -330,7 +330,7 @@ public class UnassignedInfoTests extends ESAllocationTestCase {
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
// starting replicas
clusterState = ClusterState.builder(clusterState).routingResult(allocation.applyStartedShards(clusterState, clusterState.getRoutingNodes().shardsWithState(INITIALIZING))).build();
assertThat(clusterState.getRoutingNodes().hasUnassigned(), equalTo(false));
assertThat(clusterState.getRoutingNodes().unassigned().size() > 0, equalTo(false));
// remove node2 and reroute
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).remove("node2")).build();
clusterState = ClusterState.builder(clusterState).routingResult(allocation.reroute(clusterState)).build();

View File

@ -365,7 +365,7 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
public boolean allocateUnassigned(RoutingAllocation allocation) {
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
boolean changed = !unassigned.isEmpty();
for (ShardRouting sr : unassigned) {
for (ShardRouting sr : unassigned.drain()) {
switch (sr.id()) {
case 0:
if (sr.primary()) {
@ -405,7 +405,6 @@ public class BalanceConfigurationTests extends ESAllocationTestCase {
}
}
unassigned.clear();
return changed;
}
}), EmptyClusterInfoService.INSTANCE);

View File

@ -26,15 +26,16 @@ import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.routing.allocation.decider.ClusterRebalanceAllocationDecider;
import org.elasticsearch.cluster.routing.allocation.decider.FilterAllocationDecider;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESAllocationTestCase;
import org.elasticsearch.test.gateway.NoopGatewayAllocator;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.cluster.routing.ShardRoutingState.*;
import static org.elasticsearch.common.settings.Settings.settingsBuilder;
@ -628,6 +629,112 @@ public class ClusterRebalanceRoutingTests extends ESAllocationTestCase {
assertThat(routingNodes.node("node3").isEmpty(), equalTo(true));
}
public void testRebalanceWithIgnoredUnassignedShards() {
final AtomicBoolean allocateTest1 = new AtomicBoolean(false);
AllocationService strategy = createAllocationService(Settings.EMPTY, new NoopGatewayAllocator() {
@Override
public boolean allocateUnassigned(RoutingAllocation allocation) {
if (allocateTest1.get() == false) {
RoutingNodes.UnassignedShards unassigned = allocation.routingNodes().unassigned();
RoutingNodes.UnassignedShards.UnassignedIterator iterator = unassigned.iterator();
while (iterator.hasNext()) {
ShardRouting next = iterator.next();
if ("test1".equals(next.index())) {
iterator.removeAndIgnore();
}
}
}
return super.allocateUnassigned(allocation);
}
});
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.put(IndexMetaData.builder("test1").settings(settings(Version.CURRENT)).numberOfShards(2).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.addAsNew(metaData.index("test1"))
.build();
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
logger.info("start two nodes");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(INITIALIZING));
}
logger.debug("start all the primary shards for test");
RoutingNodes routingNodes = clusterState.getRoutingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState("test", INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
}
logger.debug("now, start 1 more node, check that rebalancing will not happen since we unassigned shards");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes())
.put(newNode("node2")))
.build();
logger.debug("reroute and check that nothing has changed");
RoutingAllocation.Result reroute = strategy.reroute(clusterState);
assertFalse(reroute.changed());
routingTable = reroute.routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test").shard(i).primaryShard().state(), equalTo(STARTED));
}
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(UNASSIGNED));
}
logger.debug("now set allocateTest1 to true and reroute we should see the [test1] index initializing");
allocateTest1.set(true);
reroute = strategy.reroute(clusterState);
assertTrue(reroute.changed());
routingTable = reroute.routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(INITIALIZING));
}
logger.debug("now start initializing shards and expect exactly one rebalance from node1 to node 2 sicne index [test] is all on node1");
routingNodes = clusterState.getRoutingNodes();
routingTable = strategy.applyStartedShards(clusterState, routingNodes.shardsWithState("test1", INITIALIZING)).routingTable();
for (int i = 0; i < routingTable.index("test1").shards().size(); i++) {
assertThat(routingTable.index("test1").shard(i).shards().size(), equalTo(1));
assertThat(routingTable.index("test1").shard(i).primaryShard().state(), equalTo(STARTED));
}
int numStarted = 0;
int numRelocating = 0;
for (int i = 0; i < routingTable.index("test").shards().size(); i++) {
assertThat(routingTable.index("test").shard(i).shards().size(), equalTo(1));
if (routingTable.index("test").shard(i).primaryShard().state() == STARTED) {
numStarted++;
} else if (routingTable.index("test").shard(i).primaryShard().state() == RELOCATING) {
numRelocating++;
}
}
assertEquals(numStarted, 1);
assertEquals(numRelocating, 1);
}
public void testRebalanceWhileShardFetching() {
final AtomicBoolean hasFetches = new AtomicBoolean(true);
AllocationService strategy = createAllocationService(settingsBuilder().put(ClusterRebalanceAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ALLOW_REBALANCE,