Simplify ShardRouting and centralize move to unassigned
Make sure there is a single place where shard routing move to unassigned, so we can add additional metadata when it does, also, simplify shard routing implementations a bit closes #11634
This commit is contained in:
parent
9b833fdf4e
commit
3a97f322f7
|
@ -60,44 +60,33 @@ public class ImmutableShardRouting implements Streamable, Serializable, ShardRou
|
|||
}
|
||||
|
||||
public ImmutableShardRouting(ShardRouting copy) {
|
||||
this(copy.index(), copy.id(), copy.currentNodeId(), copy.primary(), copy.state(), copy.version());
|
||||
this.relocatingNodeId = copy.relocatingNodeId();
|
||||
this.restoreSource = copy.restoreSource();
|
||||
if (copy instanceof ImmutableShardRouting) {
|
||||
this.shardIdentifier = ((ImmutableShardRouting) copy).shardIdentifier;
|
||||
}
|
||||
this(copy, copy.version());
|
||||
}
|
||||
|
||||
public ImmutableShardRouting(ShardRouting copy, long version) {
|
||||
this(copy.index(), copy.id(), copy.currentNodeId(), copy.primary(), copy.state(), copy.version());
|
||||
this.relocatingNodeId = copy.relocatingNodeId();
|
||||
this.restoreSource = copy.restoreSource();
|
||||
this.version = version;
|
||||
if (copy instanceof ImmutableShardRouting) {
|
||||
this.shardIdentifier = ((ImmutableShardRouting) copy).shardIdentifier;
|
||||
}
|
||||
this(copy.index(), copy.id(), copy.currentNodeId(), copy.relocatingNodeId(), copy.restoreSource(), copy.primary(), copy.state(), version);
|
||||
}
|
||||
|
||||
public ImmutableShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) {
|
||||
this(index, shardId, currentNodeId, null, primary, state, version);
|
||||
}
|
||||
|
||||
public ImmutableShardRouting(String index, int shardId, String currentNodeId,
|
||||
String relocatingNodeId, boolean primary, ShardRoutingState state, long version) {
|
||||
this(index, shardId, currentNodeId, primary, state, version);
|
||||
this.relocatingNodeId = relocatingNodeId;
|
||||
this(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version);
|
||||
}
|
||||
|
||||
public ImmutableShardRouting(String index, int shardId, String currentNodeId,
|
||||
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) {
|
||||
this(index, shardId, currentNodeId, relocatingNodeId, primary, state, version);
|
||||
this.restoreSource = restoreSource;
|
||||
}
|
||||
|
||||
public ImmutableShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) {
|
||||
this.index = index;
|
||||
this.shardId = shardId;
|
||||
this.currentNodeId = currentNodeId;
|
||||
this.relocatingNodeId = relocatingNodeId;
|
||||
this.primary = primary;
|
||||
this.state = state;
|
||||
this.asList = ImmutableList.of((ShardRouting) this);
|
||||
this.version = version;
|
||||
this.restoreSource = restoreSource;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -32,25 +32,25 @@ public class MutableShardRouting extends ImmutableShardRouting {
|
|||
}
|
||||
|
||||
public MutableShardRouting(ShardRouting copy, long version) {
|
||||
super(copy);
|
||||
this.version = version;
|
||||
}
|
||||
|
||||
public MutableShardRouting(String index, int shardId, String currentNodeId, boolean primary, ShardRoutingState state, long version) {
|
||||
super(index, shardId, currentNodeId, primary, state, version);
|
||||
}
|
||||
|
||||
public MutableShardRouting(String index, int shardId, String currentNodeId,
|
||||
String relocatingNodeId, boolean primary, ShardRoutingState state, long version) {
|
||||
super(index, shardId, currentNodeId, relocatingNodeId, null, primary, state, version);
|
||||
super(copy, version);
|
||||
}
|
||||
|
||||
public MutableShardRouting(String index, int shardId, String currentNodeId,
|
||||
String relocatingNodeId, RestoreSource restoreSource, boolean primary, ShardRoutingState state, long version) {
|
||||
super(index, shardId, currentNodeId, relocatingNodeId, restoreSource, primary, state, version);
|
||||
assert state != ShardRoutingState.UNASSIGNED : "new mutable routing should not be created with UNASSIGNED state, should moveToUnassigned";
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Moves the shard to unassigned state.
|
||||
*/
|
||||
void moveToUnassigned() {
|
||||
version++;
|
||||
assert state != ShardRoutingState.UNASSIGNED;
|
||||
state = ShardRoutingState.UNASSIGNED;
|
||||
currentNodeId = null;
|
||||
relocatingNodeId = null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Assign this shard to a node.
|
||||
|
|
|
@ -109,7 +109,7 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
// add the counterpart shard with relocatingNodeId reflecting the source from which
|
||||
// it's relocating from.
|
||||
sr = new MutableShardRouting(shard.index(), shard.id(), shard.relocatingNodeId(),
|
||||
shard.currentNodeId(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version());
|
||||
shard.currentNodeId(), shard.restoreSource(), shard.primary(), ShardRoutingState.INITIALIZING, shard.version());
|
||||
entries.add(sr);
|
||||
assignedShardsAdd(sr);
|
||||
} else if (!shard.active()) { // shards that are initializing without being relocated
|
||||
|
@ -796,9 +796,10 @@ public class RoutingNodes implements Iterable<RoutingNode> {
|
|||
}
|
||||
|
||||
public void moveToUnassigned() {
|
||||
iterator().remove();
|
||||
unassigned().add(new MutableShardRouting(shard.index(), shard.id(),
|
||||
null, shard.primary(), ShardRoutingState.UNASSIGNED, shard.version() + 1));
|
||||
remove();
|
||||
MutableShardRouting unassigned = new MutableShardRouting(shard); // protective copy of the mutable shard
|
||||
unassigned.moveToUnassigned();
|
||||
unassigned().add(unassigned);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -470,14 +470,11 @@ public class AllocationService extends AbstractComponent {
|
|||
MutableShardRouting shardRouting = relocatingFromNode.next();
|
||||
if (shardRouting.equals(failedShard)) {
|
||||
dirty = true;
|
||||
relocatingFromNode.remove();
|
||||
if (addToIgnoreList) {
|
||||
// make sure we ignore this shard on the relevant node
|
||||
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
|
||||
}
|
||||
|
||||
routingNodes.unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(),
|
||||
null, failedShard.primary(), ShardRoutingState.UNASSIGNED, failedShard.version() + 1));
|
||||
relocatingFromNode.moveToUnassigned();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -512,7 +509,6 @@ public class AllocationService extends AbstractComponent {
|
|||
// make sure we ignore this shard on the relevant node
|
||||
allocation.addIgnoreShardForNode(failedShard.shardId(), failedShard.currentNodeId());
|
||||
}
|
||||
node.remove();
|
||||
// move all the shards matching the failed shard to the end of the unassigned list
|
||||
// so we give a chance for other allocations and won't create poison failed allocations
|
||||
// that can keep other shards from being allocated (because of limits applied on how many
|
||||
|
@ -529,9 +525,7 @@ public class AllocationService extends AbstractComponent {
|
|||
routingNodes.unassigned().addAll(shardsToMove);
|
||||
}
|
||||
|
||||
routingNodes.unassigned().add(new MutableShardRouting(failedShard.index(), failedShard.id(), null,
|
||||
null, failedShard.restoreSource(), failedShard.primary(), ShardRoutingState.UNASSIGNED, failedShard.version() + 1));
|
||||
|
||||
node.moveToUnassigned();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -222,9 +222,7 @@ public class CancelAllocationCommand implements AllocationCommand {
|
|||
throw new IllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " +
|
||||
discoNode + ", shard is primary and started");
|
||||
}
|
||||
it.remove();
|
||||
allocation.routingNodes().unassigned().add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),
|
||||
null, shardRouting.primary(), ShardRoutingState.UNASSIGNED, shardRouting.version() + 1));
|
||||
it.moveToUnassigned();
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
|
|
|
@ -114,21 +114,17 @@ public class ClusterHealthResponsesTests extends ElasticsearchTestCase {
|
|||
state = ShardRoutingState.STARTED;
|
||||
} else if (i > 3) {
|
||||
state = ShardRoutingState.RELOCATING;
|
||||
} else if (i > 1) {
|
||||
state = ShardRoutingState.INITIALIZING;
|
||||
} else {
|
||||
state = ShardRoutingState.UNASSIGNED;
|
||||
state = ShardRoutingState.INITIALIZING;
|
||||
}
|
||||
|
||||
switch (state) {
|
||||
case UNASSIGNED:
|
||||
return new MutableShardRouting(index, shardId, null, primary, ShardRoutingState.UNASSIGNED, 1);
|
||||
case STARTED:
|
||||
return new MutableShardRouting(index, shardId, "node_" + Integer.toString(node_id++), primary, ShardRoutingState.STARTED, 1);
|
||||
return new MutableShardRouting(index, shardId, "node_" + Integer.toString(node_id++), null, null, primary, ShardRoutingState.STARTED, 1);
|
||||
case INITIALIZING:
|
||||
return new MutableShardRouting(index, shardId, "node_" + Integer.toString(node_id++), primary, ShardRoutingState.INITIALIZING, 1);
|
||||
return new MutableShardRouting(index, shardId, "node_" + Integer.toString(node_id++), null, null, primary, ShardRoutingState.INITIALIZING, 1);
|
||||
case RELOCATING:
|
||||
return new MutableShardRouting(index, shardId, "node_" + Integer.toString(node_id++), "node_" + Integer.toString(node_id++), primary, ShardRoutingState.RELOCATING, 1);
|
||||
return new MutableShardRouting(index, shardId, "node_" + Integer.toString(node_id++), "node_" + Integer.toString(node_id++), null, primary, ShardRoutingState.RELOCATING, 1);
|
||||
default:
|
||||
throw new ElasticsearchException("Unknown state: " + state.name());
|
||||
}
|
||||
|
|
|
@ -222,7 +222,7 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
|
|||
int replicaCount = randomIntBetween(1, 10);
|
||||
for (int j = 0; j < replicaCount; j++) {
|
||||
indexShard.addShard(
|
||||
new MutableShardRouting(index, i, randomFrom(nodeIds), j == 0, ShardRoutingState.fromValue((byte) randomIntBetween(1, 4)), 1));
|
||||
new MutableShardRouting(index, i, randomFrom(nodeIds), null, null, j == 0, ShardRoutingState.fromValue((byte) randomIntBetween(2, 4)), 1));
|
||||
}
|
||||
builder.addIndexShard(indexShard.build());
|
||||
}
|
||||
|
@ -663,4 +663,4 @@ public class ClusterStateDiffTests extends ElasticsearchIntegrationTest {
|
|||
private String randomName(String prefix) {
|
||||
return prefix + Strings.randomBase64UUID(getRandom());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ public abstract class CatAllocationTestBase extends ElasticsearchAllocationTestC
|
|||
ShardRoutingState state = ShardRoutingState.valueOf(matcher.group(4));
|
||||
String ip = matcher.group(5);
|
||||
nodes.add(ip);
|
||||
MutableShardRouting routing = new MutableShardRouting(index, shard, ip, primary, state, 1);
|
||||
MutableShardRouting routing = new MutableShardRouting(index, shard, ip, null, null, primary, state, 1);
|
||||
idx.add(routing);
|
||||
logger.debug("Add routing {}", routing);
|
||||
} else {
|
||||
|
|
|
@ -829,8 +829,8 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
|||
.build();
|
||||
|
||||
// Two shards consuming each 80% of disk space while 70% is allowed, so shard 0 isn't allowed here
|
||||
MutableShardRouting firstRouting = new MutableShardRouting("test", 0, "node1", true, ShardRoutingState.STARTED, 1);
|
||||
MutableShardRouting secondRouting = new MutableShardRouting("test", 1, "node1", true, ShardRoutingState.STARTED, 1);
|
||||
MutableShardRouting firstRouting = new MutableShardRouting("test", 0, "node1", null, null, true, ShardRoutingState.STARTED, 1);
|
||||
MutableShardRouting secondRouting = new MutableShardRouting("test", 1, "node1", null, null, true, ShardRoutingState.STARTED, 1);
|
||||
RoutingNode firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting));
|
||||
RoutingTable.Builder builder = RoutingTable.builder().add(
|
||||
IndexRoutingTable.builder("test")
|
||||
|
@ -849,8 +849,8 @@ public class DiskThresholdDeciderTests extends ElasticsearchAllocationTestCase {
|
|||
assertThat(decision.type(), equalTo(Decision.Type.NO));
|
||||
|
||||
// Two shards consuming each 80% of disk space while 70% is allowed, but one is relocating, so shard 0 can stay
|
||||
firstRouting = new MutableShardRouting("test", 0, "node1", true, ShardRoutingState.STARTED, 1);
|
||||
secondRouting = new MutableShardRouting("test", 1, "node1", "node2", true, ShardRoutingState.RELOCATING, 1);
|
||||
firstRouting = new MutableShardRouting("test", 0, "node1", null, null, true, ShardRoutingState.STARTED, 1);
|
||||
secondRouting = new MutableShardRouting("test", 1, "node1", "node2", null, true, ShardRoutingState.RELOCATING, 1);
|
||||
firstRoutingNode = new RoutingNode("node1", discoveryNode1, Arrays.asList(firstRouting, secondRouting));
|
||||
builder = RoutingTable.builder().add(
|
||||
IndexRoutingTable.builder("test")
|
||||
|
|
|
@ -159,7 +159,7 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
|
|||
assertEquals(shardStateMetaData, new ShardStateMetaData(routing.version(), routing.primary(), shard.indexSettings.get(IndexMetaData.SETTING_UUID)));
|
||||
|
||||
// test if we still write it even if the shard is not active
|
||||
MutableShardRouting inactiveRouting = new MutableShardRouting(shard.shardRouting.index(), shard.shardRouting.shardId().id(), shard.shardRouting.currentNodeId(), true, ShardRoutingState.INITIALIZING, shard.shardRouting.version() + 1);
|
||||
MutableShardRouting inactiveRouting = new MutableShardRouting(shard.shardRouting.index(), shard.shardRouting.shardId().id(), shard.shardRouting.currentNodeId(), null, null, true, ShardRoutingState.INITIALIZING, shard.shardRouting.version() + 1);
|
||||
shard.persistMetadata(inactiveRouting, shard.shardRouting);
|
||||
shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
|
||||
assertEquals("inactive shard state shouldn't be persisted", shardStateMetaData, getShardStateMetadata(shard));
|
||||
|
@ -197,7 +197,7 @@ public class IndexShardTests extends ElasticsearchSingleNodeTest {
|
|||
ShardStateMetaData shardStateMetaData = load(logger, env.availableShardPaths(shard.shardId));
|
||||
assertEquals(shardStateMetaData, getShardStateMetadata(shard));
|
||||
|
||||
routing = new MutableShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.version() + 1);
|
||||
routing = new MutableShardRouting(shard.shardId.index().getName(), shard.shardId.id(), routing.currentNodeId(), null, null, routing.primary(), ShardRoutingState.INITIALIZING, shard.shardRouting.version() + 1);
|
||||
shard.updateRoutingEntry(routing, true);
|
||||
shard.deleteShardState();
|
||||
|
||||
|
|
Loading…
Reference in New Issue