From 90e0a70e0eb2f7af866c124b6a5757deda8e217a Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Mon, 17 Sep 2012 12:31:33 +0200 Subject: [PATCH] cancel allocation command to allow_primary to be cancelled --- .../command/CancelAllocationCommand.java | 23 +++++++--- .../allocation/AllocationCommandsTests.java | 42 +++++++++++++++---- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java b/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java index 1bf12f23cd2..2e5421d0b2e 100644 --- a/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java +++ b/src/main/java/org/elasticsearch/cluster/routing/allocation/command/CancelAllocationCommand.java @@ -51,13 +51,14 @@ public class CancelAllocationCommand implements AllocationCommand { @Override public CancelAllocationCommand readFrom(StreamInput in) throws IOException { - return new CancelAllocationCommand(ShardId.readShardId(in), in.readString()); + return new CancelAllocationCommand(ShardId.readShardId(in), in.readString(), in.readBoolean()); } @Override public void writeTo(CancelAllocationCommand command, StreamOutput out) throws IOException { command.shardId().writeTo(out); out.writeString(command.node()); + out.writeBoolean(command.allowPrimary()); } @Override @@ -65,6 +66,7 @@ public class CancelAllocationCommand implements AllocationCommand { String index = null; int shardId = -1; String nodeId = null; + boolean allowPrimary = false; String currentFieldName = null; XContentParser.Token token; @@ -78,6 +80,8 @@ public class CancelAllocationCommand implements AllocationCommand { shardId = parser.intValue(); } else if ("node".equals(currentFieldName)) { nodeId = parser.text(); + } else if ("allow_primary".equals(currentFieldName) || "allowPrimary".equals(currentFieldName)) { + allowPrimary = parser.booleanValue(); } else { throw new ElasticSearchParseException("[cancel] command does not support field [" + currentFieldName + "]"); } @@ -94,7 +98,7 @@ public class CancelAllocationCommand implements AllocationCommand { if (nodeId == null) { throw new ElasticSearchParseException("[cancel] command missing the node parameter"); } - return new CancelAllocationCommand(new ShardId(index, shardId), nodeId); + return new CancelAllocationCommand(new ShardId(index, shardId), nodeId, allowPrimary); } @Override @@ -103,6 +107,7 @@ public class CancelAllocationCommand implements AllocationCommand { builder.field("index", command.shardId().index()); builder.field("shard", command.shardId().id()); builder.field("node", command.node()); + builder.field("allow_primary", command.allowPrimary()); builder.endObject(); } } @@ -110,10 +115,12 @@ public class CancelAllocationCommand implements AllocationCommand { private final ShardId shardId; private final String node; + private final boolean allowPrimary; - public CancelAllocationCommand(ShardId shardId, String node) { + public CancelAllocationCommand(ShardId shardId, String node, boolean allowPrimary) { this.shardId = shardId; this.node = node; + this.allowPrimary = allowPrimary; } @Override @@ -129,6 +136,10 @@ public class CancelAllocationCommand implements AllocationCommand { return this.node; } + public boolean allowPrimary() { + return this.allowPrimary; + } + @Override public void execute(RoutingAllocation allocation) throws ElasticSearchException { DiscoveryNode discoNode = allocation.nodes().resolveNode(node); @@ -157,7 +168,7 @@ public class CancelAllocationCommand implements AllocationCommand { } } else if (shardRouting.relocating()) { // the shard is relocating to another node, cancel the recovery on the other node, and deallocate this one - if (shardRouting.primary()) { + if (!allowPrimary && shardRouting.primary()) { // can't cancel a primary shard being initialized throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + discoNode + ", shard is primary and initializing its state"); } @@ -179,9 +190,9 @@ public class CancelAllocationCommand implements AllocationCommand { } } else { // the shard is not relocating, its either started, or initializing, just cancel it and move on... - if (shardRouting.primary()) { + if (!allowPrimary && shardRouting.primary()) { // can't cancel a primary shard being initialized - throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + discoNode + ", shard is primary and initializing its state"); + throw new ElasticSearchIllegalArgumentException("[cancel_allocation] can't cancel " + shardId + " on node " + discoNode + ", shard is primary and started"); } it.remove(); allocation.routingNodes().unassigned().add(new MutableShardRouting(shardRouting.index(), shardRouting.id(), diff --git a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AllocationCommandsTests.java b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AllocationCommandsTests.java index 0b9c3a9a23e..62515296b5c 100644 --- a/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AllocationCommandsTests.java +++ b/src/test/java/org/elasticsearch/test/unit/cluster/routing/allocation/AllocationCommandsTests.java @@ -219,7 +219,7 @@ public class AllocationCommandsTests { logger.info("--> cancel primary allocation, make sure it fails..."); try { - allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1"))); + allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1", false))); assert false; } catch (ElasticSearchIllegalArgumentException e) { } @@ -233,7 +233,7 @@ public class AllocationCommandsTests { logger.info("--> cancel primary allocation, make sure it fails..."); try { - allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1"))); + allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1", false))); assert false; } catch (ElasticSearchIllegalArgumentException e) { } @@ -248,7 +248,7 @@ public class AllocationCommandsTests { assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1)); logger.info("--> cancel the relocation allocation"); - rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node2"))); + rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node2", false))); assertThat(rerouteResult.changed(), equalTo(true)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build(); assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1)); @@ -265,9 +265,9 @@ public class AllocationCommandsTests { assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1)); assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1)); - logger.info("--> cancel the primary being relocated, make sure it fails"); + logger.info("--> cancel the primary being replicated, make sure it fails"); try { - allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1"))); + allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1", false))); assert false; } catch (ElasticSearchIllegalArgumentException e) { } @@ -281,13 +281,37 @@ public class AllocationCommandsTests { assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1)); logger.info("--> cancel allocation of the replica shard"); - rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node2"))); + rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node2", false))); assertThat(rerouteResult.changed(), equalTo(true)); clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build(); assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1)); assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1)); assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(0)); assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0)); + + logger.info("--> allocate the replica shard on on the second node"); + rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new AllocateAllocationCommand(new ShardId("test", 0), "node2", false))); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build(); + assertThat(rerouteResult.changed(), equalTo(true)); + assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1)); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1)); + logger.info("--> start the replica shard"); + rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING)); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build(); + assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(1)); + assertThat(clusterState.routingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1)); + assertThat(clusterState.routingNodes().node("node2").shards().size(), equalTo(1)); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1)); + + logger.info("--> cancel the primary allocation (with allow_primary set to true)"); + rerouteResult = allocation.reroute(clusterState, new AllocationCommands(new CancelAllocationCommand(new ShardId("test", 0), "node1", true))); + clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build(); + assertThat(rerouteResult.changed(), equalTo(true)); + assertThat(clusterState.routingNodes().node("node2").shardsWithState(STARTED).get(0).primary(), equalTo(true)); + assertThat(clusterState.routingNodes().node("node1").shards().size(), equalTo(0)); + assertThat(clusterState.routingNodes().node("node3").shards().size(), equalTo(0)); } @Test @@ -295,7 +319,7 @@ public class AllocationCommandsTests { AllocationCommands commands = new AllocationCommands( new AllocateAllocationCommand(new ShardId("test", 1), "node1", true), new MoveAllocationCommand(new ShardId("test", 3), "node2", "node3"), - new CancelAllocationCommand(new ShardId("test", 4), "node5") + new CancelAllocationCommand(new ShardId("test", 4), "node5", true) ); BytesStreamOutput bytes = new BytesStreamOutput(); AllocationCommands.writeTo(commands, bytes); @@ -312,6 +336,7 @@ public class AllocationCommandsTests { assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 4))); assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node5")); + assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).allowPrimary(), equalTo(true)); } @Test @@ -320,7 +345,7 @@ public class AllocationCommandsTests { " \"commands\" : [\n" + " {\"allocate\" : {\"index\" : \"test\", \"shard\" : 1, \"node\" : \"node1\", \"allow_primary\" : true}}\n" + " ,{\"move\" : {\"index\" : \"test\", \"shard\" : 3, \"from_node\" : \"node2\", \"to_node\" : \"node3\"}} \n" + - " ,{\"cancel\" : {\"index\" : \"test\", \"shard\" : 4, \"node\" : \"node5\"}} \n" + + " ,{\"cancel\" : {\"index\" : \"test\", \"shard\" : 4, \"node\" : \"node5\", \"allow_primary\" : true}} \n" + " ]\n" + "}\n"; XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(commands); @@ -340,5 +365,6 @@ public class AllocationCommandsTests { assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).shardId(), equalTo(new ShardId("test", 4))); assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).node(), equalTo("node5")); + assertThat(((CancelAllocationCommand) (sCommands.commands().get(2))).allowPrimary(), equalTo(true)); } }