cancel allocation command to allow_primary to be cancelled

This commit is contained in:
Shay Banon 2012-09-17 12:31:33 +02:00
parent 42864f7481
commit 90e0a70e0e
2 changed files with 51 additions and 14 deletions

View File

@ -51,13 +51,14 @@ public class CancelAllocationCommand implements AllocationCommand {
@Override @Override
public CancelAllocationCommand readFrom(StreamInput in) throws IOException { public CancelAllocationCommand readFrom(StreamInput in) throws IOException {
return new CancelAllocationCommand(ShardId.readShardId(in), in.readString()); return new CancelAllocationCommand(ShardId.readShardId(in), in.readString(), in.readBoolean());
} }
@Override @Override
public void writeTo(CancelAllocationCommand command, StreamOutput out) throws IOException { public void writeTo(CancelAllocationCommand command, StreamOutput out) throws IOException {
command.shardId().writeTo(out); command.shardId().writeTo(out);
out.writeString(command.node()); out.writeString(command.node());
out.writeBoolean(command.allowPrimary());
} }
@Override @Override
@ -65,6 +66,7 @@ public class CancelAllocationCommand implements AllocationCommand {
String index = null; String index = null;
int shardId = -1; int shardId = -1;
String nodeId = null; String nodeId = null;
boolean allowPrimary = false;
String currentFieldName = null; String currentFieldName = null;
XContentParser.Token token; XContentParser.Token token;
@ -78,6 +80,8 @@ public class CancelAllocationCommand implements AllocationCommand {
shardId = parser.intValue(); shardId = parser.intValue();
} else if ("node".equals(currentFieldName)) { } else if ("node".equals(currentFieldName)) {
nodeId = parser.text(); nodeId = parser.text();
} else if ("allow_primary".equals(currentFieldName) || "allowPrimary".equals(currentFieldName)) {
allowPrimary = parser.booleanValue();
} else { } else {
throw new ElasticSearchParseException("[cancel] command does not support field [" + currentFieldName + "]"); throw new ElasticSearchParseException("[cancel] command does not support field [" + currentFieldName + "]");
} }
@ -94,7 +98,7 @@ public class CancelAllocationCommand implements AllocationCommand {
if (nodeId == null) { if (nodeId == null) {
throw new ElasticSearchParseException("[cancel] command missing the node parameter"); throw new ElasticSearchParseException("[cancel] command missing the node parameter");
} }
return new CancelAllocationCommand(new ShardId(index, shardId), nodeId); return new CancelAllocationCommand(new ShardId(index, shardId), nodeId, allowPrimary);
} }
@Override @Override
@ -103,6 +107,7 @@ public class CancelAllocationCommand implements AllocationCommand {
builder.field("index", command.shardId().index()); builder.field("index", command.shardId().index());
builder.field("shard", command.shardId().id()); builder.field("shard", command.shardId().id());
builder.field("node", command.node()); builder.field("node", command.node());
builder.field("allow_primary", command.allowPrimary());
builder.endObject(); builder.endObject();
} }
} }
@ -110,10 +115,12 @@ public class CancelAllocationCommand implements AllocationCommand {
private final ShardId shardId; private final ShardId shardId;
private final String node; private final String node;
private final boolean allowPrimary;
public CancelAllocationCommand(ShardId shardId, String node) { public CancelAllocationCommand(ShardId shardId, String node, boolean allowPrimary) {
this.shardId = shardId; this.shardId = shardId;
this.node = node; this.node = node;
this.allowPrimary = allowPrimary;
} }
@Override @Override
@ -129,6 +136,10 @@ public class CancelAllocationCommand implements AllocationCommand {
return this.node; return this.node;
} }
public boolean allowPrimary() {
return this.allowPrimary;
}
@Override @Override
public void execute(RoutingAllocation allocation) throws ElasticSearchException { public void execute(RoutingAllocation allocation) throws ElasticSearchException {
DiscoveryNode discoNode = allocation.nodes().resolveNode(node); DiscoveryNode discoNode = allocation.nodes().resolveNode(node);
@ -157,7 +168,7 @@ public class CancelAllocationCommand implements AllocationCommand {
} }
} else if (shardRouting.relocating()) { } else if (shardRouting.relocating()) {
// the shard is relocating to another node, cancel the recovery on the other node, and deallocate this one // the shard is relocating to another node, cancel the recovery on the other node, and deallocate this one
if (shardRouting.primary()) { if (!allowPrimary && shardRouting.primary()) {
// can't cancel a primary shard being initialized // can't cancel a primary shard being initialized
throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + discoNode + ", shard is primary and initializing its state"); throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + discoNode + ", shard is primary and initializing its state");
} }
@ -179,9 +190,9 @@ public class CancelAllocationCommand implements AllocationCommand {
} }
} else { } else {
// the shard is not relocating, its either started, or initializing, just cancel it and move on... // the shard is not relocating, its either started, or initializing, just cancel it and move on...
if (shardRouting.primary()) { if (!allowPrimary && shardRouting.primary()) {
// can't cancel a primary shard being initialized // can't cancel a primary shard being initialized
throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + discoNode + ", shard is primary and initializing its state"); throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + discoNode + ", shard is primary and started");
} }
it.remove(); it.remove();
allocation.routingNodes().unassigned().add(new MutableShardRouting(shardRouting.index(), shardRouting.id(), allocation.routingNodes().unassigned().add(new MutableShardRouting(shardRouting.index(), shardRouting.id(),

View File

@ -219,7 +219,7 @@ public class AllocationCommandsTests {
logger.info("--> cancel primary allocation, make sure it fails..."); logger.info("--> cancel primary allocation, make sure it fails...");
try { try {
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1"))); allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1", false)));
assert false; assert false;
} catch (ElasticSearchIllegalArgumentException e) { } catch (ElasticSearchIllegalArgumentException e) {
} }
@ -233,7 +233,7 @@ public class AllocationCommandsTests {
logger.info("--> cancel primary allocation, make sure it fails..."); logger.info("--> cancel primary allocation, make sure it fails...");
try { try {
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1"))); allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1", false)));
assert false; assert false;
} catch (ElasticSearchIllegalArgumentException e) { } catch (ElasticSearchIllegalArgumentException e) {
} }
@ -248,7 +248,7 @@ public class AllocationCommandsTests {
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1)); assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> cancel the relocation allocation"); logger.info("--> cancel the relocation allocation");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node2"))); rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node2", false)));
assertThat(rerouteResult.changed(), equalTo(true)); assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1)); assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
@ -265,9 +265,9 @@ public class AllocationCommandsTests {
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1)); assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1)); assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> cancel the primary being relocated, make sure it fails"); logger.info("--> cancel the primary being replicated, make sure it fails");
try { try {
allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1"))); allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1", false)));
assert false; assert false;
} catch (ElasticSearchIllegalArgumentException e) { } catch (ElasticSearchIllegalArgumentException e) {
} }
@ -281,13 +281,37 @@ public class AllocationCommandsTests {
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1)); assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1));
logger.info("--> cancel allocation of the replica shard"); logger.info("--> cancel allocation of the replica shard");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node2"))); rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node2", false)));
assertThat(rerouteResult.changed(), equalTo(true)); assertThat(rerouteResult.changed(), equalTo(true));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build(); clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1)); assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1)); assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(0)); assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(0));
assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0)); assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0));
logger.info("--> allocate the replica shard on on the second node");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false)));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(rerouteResult.changed(), equalTo(true));
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1));
logger.info("--> start the replica shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1));
logger.info("--> cancel the primary allocation (with allow_primary set to true)");
rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1", true)));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(rerouteResult.changed(), equalTo(true));
assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).get(0).primary(), equalTo(true));
assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(0));
assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0));
} }
@Test @Test
@ -295,7 +319,7 @@ public class AllocationCommandsTests {
AllocationCommands commands = new AllocationCommands( AllocationCommands commands = new AllocationCommands(
new AllocateAllocationCommand(new ShardId("test", 1), "node1", true), new AllocateAllocationCommand(new ShardId("test", 1), "node1", true),
new MoveAllocationCommand(new ShardId("test", 3), "node2", "node3"), new MoveAllocationCommand(new ShardId("test", 3), "node2", "node3"),
new CancelAllocationCommand(new ShardId("test", 4), "node5") new CancelAllocationCommand(new ShardId("test", 4), "node5", true)
); );
BytesStreamOutput bytes = new BytesStreamOutput(); BytesStreamOutput bytes = new BytesStreamOutput();
AllocationCommands.writeTo(commands, bytes); AllocationCommands.writeTo(commands, bytes);
@ -312,6 +336,7 @@ public class AllocationCommandsTests {
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 4))); assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 4)));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node5")); assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node5"));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).allowPrimary(), equalTo(true));
} }
@Test @Test
@ -320,7 +345,7 @@ public class AllocationCommandsTests {
" \"commands\" : [\n" + " \"commands\" : [\n" +
" {\"allocate\" : {\"index\" : \"test\", \"shard\" : 1, \"node\" : \"node1\", \"allow_primary\" : true}}\n" + " {\"allocate\" : {\"index\" : \"test\", \"shard\" : 1, \"node\" : \"node1\", \"allow_primary\" : true}}\n" +
" ,{\"move\" : {\"index\" : \"test\", \"shard\" : 3, \"from_node\" : \"node2\", \"to_node\" : \"node3\"}} \n" + " ,{\"move\" : {\"index\" : \"test\", \"shard\" : 3, \"from_node\" : \"node2\", \"to_node\" : \"node3\"}} \n" +
" ,{\"cancel\" : {\"index\" : \"test\", \"shard\" : 4, \"node\" : \"node5\"}} \n" + " ,{\"cancel\" : {\"index\" : \"test\", \"shard\" : 4, \"node\" : \"node5\", \"allow_primary\" : true}} \n" +
" ]\n" + " ]\n" +
"}\n"; "}\n";
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(commands); XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(commands);
@ -340,5 +365,6 @@ public class AllocationCommandsTests {
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 4))); assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 4)));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node5")); assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node5"));
assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).allowPrimary(), equalTo(true));
} }
} }