Fix recovery throttling to properly handle relocating non-primary shards (#18701)

Relocation of non-primary shards is realized by recovering from the primary shard. Recovery throttling wrongly equates non-primary relocation as recovering a shard from the non-primary relocation source, however.

Closes #18640
This commit is contained in:
Yannick Welsch 2016-06-03 14:11:34 +02:00
parent 6c28235b03
commit 24a7b7224b
5 changed files with 199 additions and 153 deletions

View File

@ -108,18 +108,18 @@ public class RoutingNodes implements Iterable<RoutingNode> {
// add the counterpart shard with relocatingNodeId reflecting the source from which
// it's relocating from.
ShardRouting targetShardRouting = shard.buildTargetRelocatingShard();
addInitialRecovery(targetShardRouting, routingTable);
addInitialRecovery(targetShardRouting, indexShard.primary);
previousValue = entries.put(targetShardRouting.shardId(), targetShardRouting);
if (previousValue != null) {
throw new IllegalArgumentException("Cannot have two different shards with same shard id on same node");
}
assignedShardsAdd(targetShardRouting);
} else if (shard.active() == false) { // shards that are initializing without being relocated
} else if (shard.initializing()) {
if (shard.primary()) {
inactivePrimaryCount++;
}
inactiveShardCount++;
addInitialRecovery(shard, routingTable);
addInitialRecovery(shard, indexShard.primary);
}
} else {
unassignedShards.add(shard);
@ -134,48 +134,44 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
private void addRecovery(ShardRouting routing) {
addRecovery(routing, true, null);
updateRecoveryCounts(routing, true, findAssignedPrimaryIfPeerRecovery(routing));
}
private void removeRecovery(ShardRouting routing) {
addRecovery(routing, false, null);
updateRecoveryCounts(routing, false, findAssignedPrimaryIfPeerRecovery(routing));
}
private void addInitialRecovery(ShardRouting routing, RoutingTable routingTable) {
addRecovery(routing, true, routingTable);
private void addInitialRecovery(ShardRouting routing, ShardRouting initialPrimaryShard) {
updateRecoveryCounts(routing, true, initialPrimaryShard);
}
private void addRecovery(final ShardRouting routing, final boolean increment, final RoutingTable routingTable) {
private void updateRecoveryCounts(final ShardRouting routing, final boolean increment, @Nullable final ShardRouting primary) {
final int howMany = increment ? 1 : -1;
assert routing.initializing() : "routing must be initializing: " + routing;
// TODO: check primary == null || primary.active() after all tests properly add ReplicaAfterPrimaryActiveAllocationDecider
assert primary == null || primary.assignedToNode() :
"shard is initializing but its primary is not assigned to a node";
Recoveries.getOrAdd(recoveriesPerNode, routing.currentNodeId()).addIncoming(howMany);
final String sourceNodeId;
if (routing.relocatingNodeId() != null) { // this is a relocation-target
sourceNodeId = routing.relocatingNodeId();
if (routing.primary() && increment == false) { // primary is done relocating
if (routing.isPeerRecovery()) {
// add/remove corresponding outgoing recovery on node with primary shard
if (primary == null) {
throw new IllegalStateException("shard is peer recovering but primary is unassigned");
}
Recoveries.getOrAdd(recoveriesPerNode, primary.currentNodeId()).addOutgoing(howMany);
if (increment == false && routing.primary() && routing.relocatingNodeId() != null) {
// primary is done relocating, move non-primary recoveries from old primary to new primary
int numRecoveringReplicas = 0;
for (ShardRouting assigned : assignedShards(routing.shardId())) {
if (assigned.primary() == false && assigned.initializing() && assigned.relocatingNodeId() == null) {
if (assigned.primary() == false && assigned.isPeerRecovery()) {
numRecoveringReplicas++;
}
}
// we transfer the recoveries to the relocated primary
recoveriesPerNode.get(sourceNodeId).addOutgoing(-numRecoveringReplicas);
recoveriesPerNode.get(routing.relocatingNodeId()).addOutgoing(-numRecoveringReplicas);
recoveriesPerNode.get(routing.currentNodeId()).addOutgoing(numRecoveringReplicas);
}
} else if (routing.primary() == false) { // primary without relocationID is initial recovery
ShardRouting primary = findPrimary(routing);
if (primary == null && routingTable != null) {
primary = routingTable.index(routing.index().getName()).shard(routing.shardId().id()).primary;
} else if (primary == null) {
throw new IllegalStateException("replica is initializing but primary is unassigned");
}
sourceNodeId = primary.currentNodeId();
} else {
sourceNodeId = null;
}
if (sourceNodeId != null) {
Recoveries.getOrAdd(recoveriesPerNode, sourceNodeId).addOutgoing(howMany);
}
}
@ -187,18 +183,21 @@ public class RoutingNodes implements Iterable<RoutingNode> {
return recoveriesPerNode.getOrDefault(nodeId, Recoveries.EMPTY).getOutgoing();
}
private ShardRouting findPrimary(ShardRouting routing) {
List<ShardRouting> shardRoutings = assignedShards.get(routing.shardId());
@Nullable
private ShardRouting findAssignedPrimaryIfPeerRecovery(ShardRouting routing) {
ShardRouting primary = null;
if (shardRoutings != null) {
for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.primary()) {
if (shardRouting.active()) {
return shardRouting;
} else if (primary == null) {
primary = shardRouting;
} else if (primary.relocatingNodeId() != null) {
primary = shardRouting;
if (routing.isPeerRecovery()) {
List<ShardRouting> shardRoutings = assignedShards.get(routing.shardId());
if (shardRoutings != null) {
for (ShardRouting shardRouting : shardRoutings) {
if (shardRouting.primary()) {
if (shardRouting.active()) {
return shardRouting;
} else if (primary == null) {
primary = shardRouting;
} else if (primary.relocatingNodeId() != null) {
primary = shardRouting;
}
}
}
}
@ -500,7 +499,6 @@ public class RoutingNodes implements Iterable<RoutingNode> {
ShardRouting relocationMarkerRemoved = shard.removeRelocationSource();
updateAssigned(shard, relocationMarkerRemoved);
inactiveShardCount++; // relocation targets are not counted as inactive shards whereas initializing shards are
Recoveries.getOrAdd(recoveriesPerNode, shard.relocatingNodeId()).addOutgoing(-1);
return relocationMarkerRemoved;
}
@ -856,20 +854,17 @@ public class RoutingNodes implements Iterable<RoutingNode> {
for (ShardRouting routing : routingNode) {
if (routing.initializing()) {
incoming++;
} else if (routing.relocating()) {
outgoing++;
}
if (routing.primary() && (routing.initializing() && routing.relocatingNodeId() != null) == false) { // we don't count the initialization end of the primary relocation
List<ShardRouting> shardRoutings = routingNodes.assignedShards.get(routing.shardId());
for (ShardRouting assigned : shardRoutings) {
if (assigned.primary() == false && assigned.initializing() && assigned.relocatingNodeId() == null) {
if (routing.primary() && routing.isPeerRecovery() == false) {
for (ShardRouting assigned : routingNodes.assignedShards.get(routing.shardId())) {
if (assigned.isPeerRecovery()) {
outgoing++;
}
}
}
}
}
assert incoming == value.incoming : incoming + " != " + value.incoming;
assert incoming == value.incoming : incoming + " != " + value.incoming + " node: " + routingNode;
assert outgoing == value.outgoing : outgoing + " != " + value.outgoing + " node: " + routingNode;
}

View File

@ -28,6 +28,9 @@ import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import static org.elasticsearch.cluster.routing.allocation.decider.Decision.THROTTLE;
import static org.elasticsearch.cluster.routing.allocation.decider.Decision.YES;
/**
* {@link ThrottlingAllocationDecider} controls the recovery process per node in
* the cluster. It exposes two settings via the cluster update API that allow
@ -109,50 +112,83 @@ public class ThrottlingAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (shardRouting.primary()) {
assert shardRouting.unassigned() || shardRouting.active();
if (shardRouting.unassigned()) {
// primary is unassigned, means we are going to do recovery from gateway
// count *just the primary* currently doing recovery on the node and check against concurrent_recoveries
int primariesInRecovery = 0;
for (ShardRouting shard : node) {
// when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
// we only count initial recoveries here, so we need to make sure that relocating node is null
if (shard.initializing() && shard.primary() && shard.relocatingNodeId() == null) {
primariesInRecovery++;
}
if (shardRouting.primary() && shardRouting.unassigned()) {
assert initializingShard(shardRouting, node.nodeId()).isPeerRecovery() == false;
// primary is unassigned, means we are going to do recovery from store, snapshot or local shards
// count *just the primaries* currently doing recovery on the node and check against primariesInitialRecoveries
int primariesInRecovery = 0;
for (ShardRouting shard : node) {
// when a primary shard is INITIALIZING, it can be because of *initial recovery* or *relocation from another node*
// we only count initial recoveries here, so we need to make sure that relocating node is null
if (shard.initializing() && shard.primary() && shard.relocatingNodeId() == null) {
primariesInRecovery++;
}
if (primariesInRecovery >= primariesInitialRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many primaries are currently recovering [%d], limit: [%d]",
primariesInRecovery, primariesInitialRecoveries);
}
if (primariesInRecovery >= primariesInitialRecoveries) {
// TODO: Should index creation not be throttled for primary shards?
return allocation.decision(THROTTLE, NAME, "too many primaries are currently recovering [%d], limit: [%d]",
primariesInRecovery, primariesInitialRecoveries);
} else {
return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
}
} else {
// Peer recovery
assert initializingShard(shardRouting, node.nodeId()).isPeerRecovery();
// Allocating a shard to this node will increase the incoming recoveries
int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
if (currentInRecoveries >= concurrentIncomingRecoveries) {
return allocation.decision(THROTTLE, NAME, "too many incoming shards are currently recovering [%d], limit: [%d]",
currentInRecoveries, concurrentIncomingRecoveries);
} else {
// search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId());
if (primaryShard == null) {
return allocation.decision(Decision.NO, NAME, "primary shard for this replica is not yet active");
}
int primaryNodeOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(primaryShard.currentNodeId());
if (primaryNodeOutRecoveries >= concurrentOutgoingRecoveries) {
return allocation.decision(THROTTLE, NAME, "too many outgoing shards are currently recovering [%d], limit: [%d]",
primaryNodeOutRecoveries, concurrentOutgoingRecoveries);
} else {
return allocation.decision(Decision.YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
return allocation.decision(YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]",
primaryNodeOutRecoveries,
concurrentOutgoingRecoveries,
currentInRecoveries,
concurrentIncomingRecoveries);
}
}
}
// TODO should we allow shards not allocated post API to always allocate?
// either primary or replica doing recovery (from peer shard)
// count the number of recoveries on the node, its for both target (INITIALIZING) and source (RELOCATING)
return canAllocate(node, allocation);
}
@Override
public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) {
int currentOutRecoveries = allocation.routingNodes().getOutgoingRecoveries(node.nodeId());
int currentInRecoveries = allocation.routingNodes().getIncomingRecoveries(node.nodeId());
if (currentOutRecoveries >= concurrentOutgoingRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many outgoing shards are currently recovering [%d], limit: [%d]",
currentOutRecoveries, concurrentOutgoingRecoveries);
} else if (currentInRecoveries >= concurrentIncomingRecoveries) {
return allocation.decision(Decision.THROTTLE, NAME, "too many incoming shards are currently recovering [%d], limit: [%d]",
currentInRecoveries, concurrentIncomingRecoveries);
} else {
return allocation.decision(Decision.YES, NAME, "below shard recovery limit of outgoing: [%d < %d] incoming: [%d < %d]",
currentOutRecoveries,
concurrentOutgoingRecoveries,
currentInRecoveries,
concurrentIncomingRecoveries);
/**
* The shard routing passed to {@link #canAllocate(ShardRouting, RoutingNode, RoutingAllocation)} is not the initializing shard to this
* node but:
* - the unassigned shard routing in case if we want to assign an unassigned shard to this node.
* - the initializing shard routing if we want to assign the initializing shard to this node instead
* - the started shard routing in case if we want to check if we can relocate to this node.
* - the relocating shard routing if we want to relocate to this node now instead.
*
* This method returns the corresponding initializing shard that would be allocated to this node.
*/
private ShardRouting initializingShard(ShardRouting shardRouting, String currentNodeId) {
final ShardRouting initializingShard;
if (shardRouting.unassigned()) {
initializingShard = shardRouting.initialize(currentNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else if (shardRouting.initializing()) {
initializingShard = shardRouting.moveToUnassigned(shardRouting.unassignedInfo())
.initialize(currentNodeId, null, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE);
} else if (shardRouting.relocating()) {
initializingShard = shardRouting.cancelRelocation()
.relocate(currentNodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)
.buildTargetRelocatingShard();
} else {
assert shardRouting.started();
initializingShard = shardRouting.relocate(currentNodeId, ShardRouting.UNAVAILABLE_EXPECTED_SHARD_SIZE)
.buildTargetRelocatingShard();
}
assert initializingShard.initializing();
return initializingShard;
}
}

View File

@ -34,7 +34,7 @@ public final class RandomShardRoutingMutator {
public static ShardRouting randomChange(ShardRouting shardRouting, String[] nodes) {
switch (randomInt(2)) {
case 0:
if (shardRouting.unassigned() == false) {
if (shardRouting.unassigned() == false && shardRouting.primary() == false) {
shardRouting = shardRouting.moveToUnassigned(new UnassignedInfo(randomReason(), randomAsciiOfLength(10)));
} else if (shardRouting.unassignedInfo() != null) {
shardRouting = shardRouting.updateUnassignedInfo(new UnassignedInfo(randomReason(), randomAsciiOfLength(10)));

View File

@ -54,7 +54,6 @@ public class RandomAllocationDeciderTests extends ESAllocationTestCase {
* amount of iterations the test allows allocation unless the same shard is
* already allocated on a node and balances the cluster to gain optimal
* balance.*/
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/pull/18701")
public void testRandomDecisions() {
RandomAllocationDecider randomAllocationDecider = new RandomAllocationDecider(random());
AllocationService strategy = new AllocationService(Settings.builder().build(), new AllocationDeciders(Settings.EMPTY,

View File

@ -19,11 +19,13 @@
package org.elasticsearch.cluster.routing.allocation;
import com.carrotsearch.hppc.IntHashSet;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RestoreSource;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
@ -31,6 +33,8 @@ import org.elasticsearch.cluster.routing.allocation.decider.Decision;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.test.ESAllocationTestCase;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
@ -57,9 +61,7 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(10).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
RoutingTable routingTable = createRecoveryRoutingTable(metaData.index("test"));
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
@ -118,9 +120,7 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(5).numberOfReplicas(1))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
RoutingTable routingTable = createRecoveryRoutingTable(metaData.index("test"));
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
@ -188,9 +188,7 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(9).numberOfReplicas(0))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
RoutingTable routingTable = createRecoveryRoutingTable(metaData.index("test"));
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
@ -242,89 +240,107 @@ public class ThrottlingAllocationTests extends ESAllocationTestCase {
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1);
}
public void testOutgoingThrottlesAllocaiton() {
Settings settings = Settings.builder()
.put("cluster.routing.allocation.node_concurrent_recoveries", 1)
.put("cluster.routing.allocation.node_initial_primaries_recoveries", 1)
.put("cluster.routing.allocation.cluster_concurrent_rebalance", 1)
.build();
AllocationService strategy = createAllocationService(settings);
public void testOutgoingThrottlesAllocation() {
AllocationService strategy = createAllocationService(Settings.builder()
.put("cluster.routing.allocation.node_concurrent_outgoing_recoveries", 1)
.build());
logger.info("Building initial routing table");
MetaData metaData = MetaData.builder()
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(3).numberOfReplicas(0))
.put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(1).numberOfReplicas(2))
.build();
RoutingTable routingTable = RoutingTable.builder()
.addAsNew(metaData.index("test"))
.build();
RoutingTable routingTable = createRecoveryRoutingTable(metaData.index("test"));
ClusterState clusterState = ClusterState.builder(org.elasticsearch.cluster.ClusterName.DEFAULT).metaData(metaData).routingTable(routingTable).build();
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1")).put(newNode("node2")).put(newNode("node3"))).build();
logger.info("start one node, do reroute, only 1 should initialize");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder().put(newNode("node1"))).build();
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(3));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 1);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(0));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(2));
logger.info("start initializing");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(1));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(0));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(2));
RoutingAllocation.Result reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", clusterState.getRoutingNodes().node("node1").iterator().next().shardId().id(), "node1", "node2")), false, false);
assertEquals(reroute.explanations().explanations().size(), 1);
assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.YES);
routingTable = reroute.routingTable();
logger.info("start one more node, first non-primary should start being allocated");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node2"))).build();
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
// outgoing throttles
reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", clusterState.getRoutingNodes().node("node3").iterator().next().shardId().id(), "node3", "node1")), true, false);
assertEquals(reroute.explanations().explanations().size(), 1);
assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.THROTTLE);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0);
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(1));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(1));
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
logger.info("start initializing non-primary");
routingTable = strategy.applyStartedShards(clusterState, routingTable.shardsWithState(INITIALIZING)).routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(2));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(0));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(1));
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 0);
logger.info("start one more node, initializing second non-primary");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node3"))).build();
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(2));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1);
// incoming throttles
reroute = strategy.reroute(clusterState, new AllocationCommands(new MoveAllocationCommand("test", clusterState.getRoutingNodes().node("node3").iterator().next().shardId().id(), "node3", "node2")), true, false);
logger.info("start one more node");
clusterState = ClusterState.builder(clusterState).nodes(DiscoveryNodes.builder(clusterState.nodes()).put(newNode("node4"))).build();
routingTable = strategy.reroute(clusterState, "reroute").routingTable();
clusterState = ClusterState.builder(clusterState).routingTable(routingTable).build();
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1);
logger.info("move started non-primary to new node");
RoutingAllocation.Result reroute = strategy.reroute(clusterState, new AllocationCommands(
new MoveAllocationCommand("test", 0, "node2", "node4")), true, false);
assertEquals(reroute.explanations().explanations().size(), 1);
assertEquals(reroute.explanations().explanations().get(0).decisions().type(), Decision.Type.THROTTLE);
// even though it is throttled, move command still forces allocation
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node1"), 0);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node2"), 1);
assertEquals(clusterState.getRoutingNodes().getIncomingRecoveries("node3"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 1);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node3"), 0);
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(2));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(1));
clusterState = ClusterState.builder(clusterState).routingResult(reroute).build();
routingTable = clusterState.routingTable();
assertThat(routingTable.shardsWithState(STARTED).size(), equalTo(1));
assertThat(routingTable.shardsWithState(RELOCATING).size(), equalTo(1));
assertThat(routingTable.shardsWithState(INITIALIZING).size(), equalTo(2));
assertThat(routingTable.shardsWithState(UNASSIGNED).size(), equalTo(0));
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node1"), 2);
assertEquals(clusterState.getRoutingNodes().getOutgoingRecoveries("node2"), 0);
}
private RoutingTable createRecoveryRoutingTable(IndexMetaData indexMetaData) {
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
switch (randomInt(5)) {
case 0: routingTableBuilder.addAsRecovery(indexMetaData); break;
case 1: routingTableBuilder.addAsFromCloseToOpen(indexMetaData); break;
case 2: routingTableBuilder.addAsFromDangling(indexMetaData); break;
case 3: routingTableBuilder.addAsNewRestore(indexMetaData,
new RestoreSource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT,
indexMetaData.getIndex().getName()), new IntHashSet()); break;
case 4: routingTableBuilder.addAsRestore(indexMetaData,
new RestoreSource(new Snapshot("repo", new SnapshotId("snap", "randomId")), Version.CURRENT,
indexMetaData.getIndex().getName())); break;
case 5: routingTableBuilder.addAsNew(indexMetaData); break;
default: throw new IndexOutOfBoundsException();
}
return routingTableBuilder.build();
}
}