Simplify assignToNode to only do initializing

The method really only should do the move from unassigned to initializing, all the other moves have explicit methods like relocate
This commit is contained in:
Shay Banon 2015-07-14 15:47:32 +02:00
parent 5324855224
commit c6b110c6ef
8 changed files with 39 additions and 69 deletions

View File

@ -363,37 +363,16 @@ public class RoutingNodes implements Iterable<RoutingNode> {
}
/**
* Assign a shard to a node. This will increment the inactiveShardCount counter
* and the inactivePrimaryCount counter if the shard is the primary.
* In case the shard is already assigned and started, it will be marked as
* relocating, which is accounted for, too, so the number of concurrent relocations
* can be retrieved easily.
* This method can be called several times for the same shard, only the first time
* will change the state.
*
* INITIALIZING => INITIALIZING
* UNASSIGNED => INITIALIZING
* STARTED => RELOCATING
* RELOCATING => RELOCATING
*
* @param shard the shard to be assigned
* @param nodeId the nodeId this shard should initialize on or relocate from
* Moves a shard from unassigned to initialize state
*/
public void assign(ShardRouting shard, String nodeId) {
// state will not change if the shard is already initializing.
ShardRoutingState oldState = shard.state();
shard.assignToNode(nodeId);
public void initialize(ShardRouting shard, String nodeId) {
assert shard.unassigned() : shard;
shard.initialize(nodeId);
node(nodeId).add(shard);
if (oldState == ShardRoutingState.UNASSIGNED) {
inactiveShardCount++;
if (shard.primary()) {
inactivePrimaryCount++;
}
}
if (shard.state() == ShardRoutingState.RELOCATING) {
relocatingShards++;
}
assignedShardsAdd(shard);
}
@ -406,7 +385,8 @@ public class RoutingNodes implements Iterable<RoutingNode> {
relocatingShards++;
shard.relocate(nodeId);
ShardRouting target = shard.buildTargetRelocatingShard();
assign(target, target.currentNodeId());
node(target.currentNodeId()).add(target);
assignedShardsAdd(target);
return target;
}

View File

@ -350,7 +350,7 @@ public final class ShardRouting implements Streamable, ToXContent {
void moveToUnassigned(UnassignedInfo unassignedInfo) {
ensureNotFrozen();
version++;
assert state != ShardRoutingState.UNASSIGNED;
assert state != ShardRoutingState.UNASSIGNED : this;
state = ShardRoutingState.UNASSIGNED;
currentNodeId = null;
relocatingNodeId = null;
@ -358,24 +358,15 @@ public final class ShardRouting implements Streamable, ToXContent {
}
/**
* Assign this shard to a node.
*
* @param nodeId id of the node to assign this shard to
* Initializes an unassigned shard on a node.
*/
void assignToNode(String nodeId) {
void initialize(String nodeId) {
ensureNotFrozen();
version++;
if (currentNodeId == null) {
assert state == ShardRoutingState.UNASSIGNED;
assert state == ShardRoutingState.UNASSIGNED : this;
assert relocatingNodeId == null : this;
state = ShardRoutingState.INITIALIZING;
currentNodeId = nodeId;
relocatingNodeId = null;
} else if (state == ShardRoutingState.STARTED) {
state = ShardRoutingState.RELOCATING;
relocatingNodeId = nodeId;
} else if (state == ShardRoutingState.RELOCATING) {
assert nodeId.equals(relocatingNodeId);
}
}
/**
@ -386,7 +377,7 @@ public final class ShardRouting implements Streamable, ToXContent {
void relocate(String relocatingNodeId) {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.STARTED;
assert state == ShardRoutingState.STARTED : this;
state = ShardRoutingState.RELOCATING;
this.relocatingNodeId = relocatingNodeId;
}
@ -398,9 +389,9 @@ public final class ShardRouting implements Streamable, ToXContent {
void cancelRelocation() {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.RELOCATING;
assert assignedToNode();
assert relocatingNodeId != null;
assert state == ShardRoutingState.RELOCATING : this;
assert assignedToNode() : this;
assert relocatingNodeId != null : this;
state = ShardRoutingState.STARTED;
relocatingNodeId = null;
@ -424,7 +415,7 @@ public final class ShardRouting implements Streamable, ToXContent {
void moveToStarted() {
ensureNotFrozen();
version++;
assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING;
assert state == ShardRoutingState.INITIALIZING || state == ShardRoutingState.RELOCATING : this;
relocatingNodeId = null;
restoreSource = null;
state = ShardRoutingState.STARTED;

View File

@ -688,7 +688,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shard, minNode.getNodeId());
}
routingNodes.assign(shard, routingNodes.node(minNode.getNodeId()).nodeId());
routingNodes.initialize(shard, routingNodes.node(minNode.getNodeId()).nodeId());
changed = true;
continue; // don't add to ignoreUnassigned
} else {
@ -783,8 +783,7 @@ public class BalancedShardsAllocator extends AbstractComponent implements Shards
routingNodes.relocate(candidate, lowRoutingNode.nodeId());
} else {
assert candidate.unassigned();
routingNodes.assign(candidate, routingNodes.node(minNode.getNodeId()).nodeId());
routingNodes.initialize(candidate, routingNodes.node(minNode.getNodeId()).nodeId());
}
return true;

View File

@ -225,7 +225,7 @@ public class AllocateAllocationCommand implements AllocationCommand {
continue;
}
it.remove();
routingNodes.assign(shardRouting, routingNode.nodeId());
routingNodes.initialize(shardRouting, routingNode.nodeId());
if (shardRouting.primary()) {
// we need to clear the post allocation flag, since its an explicit allocation of the primary shard
// and we want to force allocate it (and create a new index for it)

View File

@ -339,7 +339,7 @@ public class GatewayAllocator extends AbstractComponent {
// we found a match
changed = true;
// make sure we create one with the version from the recovered state
routingNodes.assign(new ShardRouting(shard, highestVersion), node.nodeId());
routingNodes.initialize(new ShardRouting(shard, highestVersion), node.nodeId());
unassignedIterator.remove();
// found a node, so no throttling, no "no", and break out of the loop
@ -359,7 +359,7 @@ public class GatewayAllocator extends AbstractComponent {
// we found a match
changed = true;
// make sure we create one with the version from the recovered state
routingNodes.assign(new ShardRouting(shard, highestVersion), node.nodeId());
routingNodes.initialize(new ShardRouting(shard, highestVersion), node.nodeId());
unassignedIterator.remove();
}
} else {
@ -514,7 +514,7 @@ public class GatewayAllocator extends AbstractComponent {
}
// we found a match
changed = true;
routingNodes.assign(shard, lastNodeMatched.nodeId());
routingNodes.initialize(shard, lastNodeMatched.nodeId());
unassignedIterator.remove();
}
} else if (hasReplicaData == false) {

View File

@ -82,7 +82,7 @@ public class ShardRoutingTests extends ElasticsearchTestCase {
}
try {
routing.assignToNode("boom");
routing.initialize("boom");
fail("must be frozen");
} catch (IllegalStateException ex) {
// expected

View File

@ -189,7 +189,7 @@ public class UnassignedInfoTests extends ElasticsearchAllocationTestCase {
ShardRouting shard = TestShardRouting.newShardRouting("test", 1, null, null, null, true, ShardRoutingState.UNASSIGNED, 1, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null));
ShardRouting mutable = new ShardRouting(shard);
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.assignToNode("test_node");
mutable.initialize("test_node");
assertThat(mutable.state(), equalTo(ShardRoutingState.INITIALIZING));
assertThat(mutable.unassignedInfo(), notNullValue());
mutable.moveToStarted();

View File

@ -369,37 +369,37 @@ public class BalanceConfigurationTests extends ElasticsearchAllocationTestCase {
switch (sr.id()) {
case 0:
if (sr.primary()) {
allocation.routingNodes().assign(sr, "node1");
allocation.routingNodes().initialize(sr, "node1");
} else {
allocation.routingNodes().assign(sr, "node0");
allocation.routingNodes().initialize(sr, "node0");
}
break;
case 1:
if (sr.primary()) {
allocation.routingNodes().assign(sr, "node1");
allocation.routingNodes().initialize(sr, "node1");
} else {
allocation.routingNodes().assign(sr, "node2");
allocation.routingNodes().initialize(sr, "node2");
}
break;
case 2:
if (sr.primary()) {
allocation.routingNodes().assign(sr, "node3");
allocation.routingNodes().initialize(sr, "node3");
} else {
allocation.routingNodes().assign(sr, "node2");
allocation.routingNodes().initialize(sr, "node2");
}
break;
case 3:
if (sr.primary()) {
allocation.routingNodes().assign(sr, "node3");
allocation.routingNodes().initialize(sr, "node3");
} else {
allocation.routingNodes().assign(sr, "node1");
allocation.routingNodes().initialize(sr, "node1");
}
break;
case 4:
if (sr.primary()) {
allocation.routingNodes().assign(sr, "node2");
allocation.routingNodes().initialize(sr, "node2");
} else {
allocation.routingNodes().assign(sr, "node0");
allocation.routingNodes().initialize(sr, "node0");
}
break;
}