The reroute command allows to explcitiyly execute a cluster reroute allocation command including specific commands. For example, a shard can be moved from one node to another explicitly, an allocation can be canceled, or an unassigned shard can be explicitly allocated on a specific node.

Here is a short example of how a simple reroute API call:

    curl -XPOST 'localhost:9200/_cluster/reroute' -d '{
        "commands" : [
            {"move" : {"index" : "test", "shard" : 0, "from_node" : "node1", "to_node" : "node2"}},
            {"allocate" : {"index" : "test", "shard" : 1, "node" : "node3"}}
        ]
    }'

An importnat aspect to remember is the fact that once when an allocation occurs, the cluster will aim at rebalancing its state back to an even state. For example, if the allocation includes moving a shard from `node1` to `node2`, in an "even" state, then another shard will be moved from `node2` to `node1` to even things out.

The cluster can be set to disable allocations, which means that only the explicitl allocations will be performed. Obviously, only once all commands has been applied, the cluster will aim to be rebalance its state.

Anohter option is to run the commands in "dry_run" (as a URI flag, or in the request body). This will cause the commands to apply to the current cluster state, and reutrn the resulting cluster after the comamnds (and rebalancing) has been applied.

The commands supporterd are:

* `move`: Move a started shard from one node to anotehr node. Accepts `index` and `shard` for index name and shard number, `from_node` for the node to move the shard "from", and `to_node` for the node to move the shard to.
* `cancel`: Cancel allocation of a shard (or recovery). Accepts `index` and `shard` for index name and shar number, and `node` for the node to cancel the shard allocation on.
* `allocate`: Allocate an unassigned shard to a node. Accepts the `index` and `shard` for index name and shard number, and `node` to allocate the shard to. It also accepts `allow_primary` flag to explciitly specify that it is allowed to explciitly allocate a primary shard (might result in data loss).

closes #2256
This commit is contained in:
Shay Banon 2012-09-15 22:56:14 +02:00
parent 19fdd46c87
commit afca5ef15f
13 changed files with 424 additions and 37 deletions

View File

@ -19,10 +19,16 @@
package org.elasticsearch.action.admin.cluster.reroute;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
@ -30,9 +36,65 @@ import java.io.IOException;
*/
public class ClusterRerouteRequest extends MasterNodeOperationRequest {
AllocationCommands commands = new AllocationCommands();
boolean dryRun;
public ClusterRerouteRequest() {
}
/**
* Adds allocation commands to be applied to the cluster. Note, can be empty, in which case
* will simply run a simple "reroute".
*/
public ClusterRerouteRequest add(AllocationCommand... commands) {
this.commands.add(commands);
return this;
}
/**
* Sets a dry run flag (defaults to <tt>false</tt>) allowing to run the commands without
* actually applying them to the cluster state, and getting the resulting cluster state back.
*/
public ClusterRerouteRequest dryRun(boolean dryRun) {
this.dryRun = dryRun;
return this;
}
public boolean dryRun() {
return this.dryRun;
}
/**
* Sets the source for the request.
*/
public ClusterRerouteRequest source(BytesReference source) throws Exception {
XContentParser parser = XContentHelper.createParser(source);
try {
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if (token == XContentParser.Token.START_ARRAY) {
if ("commands".equals(currentFieldName)) {
this.commands = AllocationCommands.fromXContent(parser);
} else {
throw new ElasticSearchParseException("failed to parse reroute request, got start array with wrong field name [" + currentFieldName + "]");
}
} else if (token.isValue()) {
if ("dry_run".equals(currentFieldName) || "dryRun".equals(currentFieldName)) {
dryRun = parser.booleanValue();
} else {
throw new ElasticSearchParseException("failed to parse reroute request, got value with wrong field name [" + currentFieldName + "]");
}
}
}
} finally {
parser.close();
}
return this;
}
@Override
public ActionRequestValidationException validate() {
return null;
@ -41,10 +103,14 @@ public class ClusterRerouteRequest extends MasterNodeOperationRequest {
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
commands = AllocationCommands.readFrom(in);
dryRun = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
AllocationCommands.writeTo(commands, out);
out.writeBoolean(dryRun);
}
}

View File

@ -22,6 +22,8 @@ package org.elasticsearch.action.admin.cluster.reroute;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.support.BaseClusterRequestBuilder;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
/**
@ -32,6 +34,29 @@ public class ClusterRerouteRequestBuilder extends BaseClusterRequestBuilder<Clus
super(clusterClient, new ClusterRerouteRequest());
}
/**
* Adds allocation commands to be applied to the cluster. Note, can be empty, in which case
* will simply run a simple "reroute".
*/
public ClusterRerouteRequestBuilder add(AllocationCommand... commands) {
request.add(commands);
return this;
}
/**
* Sets a dry run flag (defaults to <tt>false</tt>) allowing to run the commands without
* actually applying them to the cluster state, and getting the resulting cluster state back.
*/
public ClusterRerouteRequestBuilder setDryRun(boolean dryRun) {
request.dryRun(dryRun);
return this;
}
public ClusterRerouteRequestBuilder setSource(BytesReference source) throws Exception {
request.source(source);
return this;
}
/**
* Sets the master node timeout in case the master has not yet been discovered.
*/

View File

@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.cluster.reroute;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -29,14 +30,31 @@ import java.io.IOException;
*/
public class ClusterRerouteResponse implements ActionResponse {
private ClusterState state;
ClusterRerouteResponse() {
}
ClusterRerouteResponse(ClusterState state) {
this.state = state;
}
public ClusterState state() {
return this.state;
}
public ClusterState getState() {
return this.state;
}
@Override
public void readFrom(StreamInput in) throws IOException {
state = ClusterState.Builder.readFrom(in, null);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
ClusterState.Builder.writeTo(state, out);
}
}

View File

@ -70,19 +70,26 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
}
@Override
protected ClusterRerouteResponse masterOperation(ClusterRerouteRequest request, ClusterState state) throws ElasticSearchException {
protected ClusterRerouteResponse masterOperation(final ClusterRerouteRequest request, ClusterState state) throws ElasticSearchException {
final AtomicReference<Throwable> failureRef = new AtomicReference<Throwable>();
final AtomicReference<ClusterState> clusterStateResponse = new AtomicReference<ClusterState>();
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("cluster_reroute (api)", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
try {
RoutingAllocation.Result routingResult = allocationService.reroute(currentState);
return newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
RoutingAllocation.Result routingResult = allocationService.reroute(currentState, request.commands);
ClusterState newState = newClusterStateBuilder().state(currentState).routingResult(routingResult).build();
clusterStateResponse.set(newState);
if (request.dryRun) {
return currentState;
}
return newState;
} catch (Exception e) {
logger.debug("failed to reroute", e);
failureRef.set(e);
latch.countDown();
logger.warn("failed to reroute", e);
return currentState;
} finally {
// we don't release the latch here, only after we rerouted
@ -109,7 +116,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
}
}
return new ClusterRerouteResponse();
return new ClusterRerouteResponse(clusterStateResponse.get());
}
}

View File

@ -113,7 +113,11 @@ public class AllocationService extends AbstractComponent {
// a consistent result of the effect the commands have on the routing
// this allows systems to dry run the commands, see the resulting cluster state, and act on it
RoutingAllocation allocation = new RoutingAllocation(allocationDeciders, routingNodes, clusterState.nodes());
// we ignore disable allocation, because commands are explicit
allocation.ignoreDisable(true);
commands.execute(allocation);
// we revert the ignore disable flag, since when rerouting, we want the original setting to take place
allocation.ignoreDisable(false);
// the assumption is that commands will move / act on shards (or fail through exceptions)
// so, there will always be shard "movements", so no need to check on reroute
reroute(allocation);

View File

@ -71,7 +71,7 @@ public class RoutingAllocation {
private Map<ShardId, String> ignoredShardToNodes = null;
private Map<ShardId, String> ignoreDisable = null;
private boolean ignoreDisable = false;
public RoutingAllocation(AllocationDeciders deciders, RoutingNodes routingNodes, DiscoveryNodes nodes) {
this.deciders = deciders;
@ -103,15 +103,12 @@ public class RoutingAllocation {
return explanation;
}
public void addIgnoreDisable(ShardId shardId, String nodeId) {
if (ignoreDisable == null) {
ignoreDisable = new HashMap<ShardId, String>();
}
ignoreDisable.put(shardId, nodeId);
public void ignoreDisable(boolean ignoreDisable) {
this.ignoreDisable = ignoreDisable;
}
public boolean shouldIgnoreDisable(ShardId shardId, String nodeId) {
return ignoreDisable != null && nodeId.equals(ignoreDisable.get(shardId));
public boolean ignoreDisable() {
return this.ignoreDisable;
}
public void addIgnoreShardForNode(ShardId shardId, String nodeId) {

View File

@ -160,7 +160,6 @@ public class AllocateAllocationCommand implements AllocationCommand {
}
RoutingNode routingNode = allocation.routingNodes().node(discoNode.id());
allocation.addIgnoreDisable(shardRouting.shardId(), routingNode.nodeId());
if (!allocation.deciders().canAllocate(shardRouting, routingNode, allocation).allowed()) {
throw new ElasticSearchIllegalArgumentException("[allocate] allocation of " + shardId + " on node " + discoNode + " is not allowed");
}

View File

@ -126,25 +126,25 @@ public class AllocationCommands {
public static AllocationCommands fromXContent(XContentParser parser) throws IOException {
AllocationCommands commands = new AllocationCommands();
XContentParser.Token token = parser.nextToken();
XContentParser.Token token = parser.currentToken();
if (token == null) {
throw new ElasticSearchParseException("No commands");
}
if (token != XContentParser.Token.START_OBJECT) {
throw new ElasticSearchParseException("No start object, got " + token);
}
token = parser.nextToken();
if (token != XContentParser.Token.FIELD_NAME) {
throw new ElasticSearchParseException("expected the field name `commands` to exists, got " + token);
}
if (!parser.currentName().equals("commands")) {
throw new ElasticSearchParseException("expected field name to be named `commands`, got " + parser.currentName());
}
token = parser.nextToken();
if (token != XContentParser.Token.START_ARRAY) {
throw new ElasticSearchParseException("commands should follow with an array element");
if (token == XContentParser.Token.FIELD_NAME) {
if (!parser.currentName().equals("commands")) {
throw new ElasticSearchParseException("expected field name to be named `commands`, got " + parser.currentName());
}
if (!parser.currentName().equals("commands")) {
throw new ElasticSearchParseException("expected field name to be named `commands`, got " + parser.currentName());
}
token = parser.nextToken();
if (token != XContentParser.Token.START_ARRAY) {
throw new ElasticSearchParseException("commands should follow with an array element");
}
} else if (token == XContentParser.Token.START_ARRAY) {
// ok...
} else {
throw new ElasticSearchParseException("expected either field name commands, or start array, got " + token);
}
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token == XContentParser.Token.START_OBJECT) {
@ -165,7 +165,6 @@ public class AllocationCommands {
}
public static void toXContent(AllocationCommands commands, XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startObject();
builder.startArray("commands");
for (AllocationCommand command : commands.commands) {
builder.startObject();
@ -174,6 +173,5 @@ public class AllocationCommands {
builder.endObject();
}
builder.endArray();
builder.endObject();
}
}

View File

@ -70,10 +70,10 @@ public class DisableAllocationDecider extends AllocationDecider {
@Override
public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) {
if (disableAllocation) {
return allocation.shouldIgnoreDisable(shardRouting.shardId(), node.nodeId()) ? Decision.YES : Decision.NO;
return allocation.ignoreDisable() ? Decision.YES : Decision.NO;
}
if (disableReplicaAllocation) {
return shardRouting.primary() ? Decision.YES : allocation.shouldIgnoreDisable(shardRouting.shardId(), node.nodeId()) ? Decision.YES : Decision.NO;
return shardRouting.primary() ? Decision.YES : allocation.ignoreDisable() ? Decision.YES : Decision.NO;
}
return Decision.YES;
}

View File

@ -27,7 +27,9 @@ import org.elasticsearch.client.Requests;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
@ -35,10 +37,13 @@ import java.io.IOException;
*/
public class RestClusterRerouteAction extends BaseRestHandler {
private final SettingsFilter settingsFilter;
@Inject
public RestClusterRerouteAction(Settings settings, Client client, RestController controller,
SettingsFilter settingsFilter) {
super(settings, client);
this.settingsFilter = settingsFilter;
controller.registerHandler(RestRequest.Method.POST, "/_cluster/reroute", this);
}
@ -46,11 +51,37 @@ public class RestClusterRerouteAction extends BaseRestHandler {
public void handleRequest(final RestRequest request, final RestChannel channel) {
final ClusterRerouteRequest clusterRerouteRequest = Requests.clusterRerouteRequest();
clusterRerouteRequest.listenerThreaded(false);
clusterRerouteRequest.dryRun(request.paramAsBoolean("dry_run", clusterRerouteRequest.dryRun()));
if (request.hasContent()) {
try {
clusterRerouteRequest.source(request.content());
} catch (Exception e) {
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.warn("Failed to send response", e1);
}
return;
}
}
client.admin().cluster().reroute(clusterRerouteRequest, new ActionListener<ClusterRerouteResponse>() {
@Override
public void onResponse(ClusterRerouteResponse response) {
try {
channel.sendResponse(new StringRestResponse(RestStatus.OK));
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.field("ok", true);
builder.startObject("state");
// by default, filter metadata
if (request.param("filter_metadata") == null) {
request.params().put("filter_metadata", "true");
}
response.state().settingsFilter(settingsFilter).toXContent(builder, request);
builder.endObject();
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder));
} catch (Exception e) {
onFailure(e);
}

View File

@ -0,0 +1,162 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.integration.cluster.allocation;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.AllocateAllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.gateway.Gateway;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.test.integration.AbstractNodesTests;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@Test
public class ClusterRerouteTests extends AbstractNodesTests {
private final ESLogger logger = Loggers.getLogger(ClusterRerouteTests.class);
@AfterMethod
public void cleanAndCloseNodes() throws Exception {
for (int i = 0; i < 10; i++) {
if (node("node" + i) != null) {
node("node" + i).stop();
// since we store (by default) the index snapshot under the gateway, resetting it will reset the index data as well
((InternalNode) node("node" + i)).injector().getInstance(Gateway.class).reset();
}
}
closeAllNodes();
}
@Test
public void rerouteWithCommands() throws Exception {
Settings commonSettings = settingsBuilder()
.put("cluster.routing.allocation.disable_allocation", true)
.build();
startNode("node1", commonSettings);
startNode("node2", commonSettings);
logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate");
client("node1").admin().indices().prepareCreate("test")
.setSettings(settingsBuilder().put("index.number_of_shards", 1))
.execute().actionGet();
ClusterState state = client("node1").admin().cluster().prepareState().execute().actionGet().state();
assertThat(state.routingNodes().unassigned().size(), equalTo(2));
logger.info("--> explicitly allocate shard 1, *under dry_run*");
state = client("node1").admin().cluster().prepareReroute()
.add(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true))
.setDryRun(true)
.execute().actionGet().state();
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
logger.info("--> get the state, verify nothing changed because of the dry run");
state = client("node1").admin().cluster().prepareState().execute().actionGet().state();
assertThat(state.routingNodes().unassigned().size(), equalTo(2));
logger.info("--> explicitly allocate shard 1, actually allocating, no dry run");
state = client("node1").admin().cluster().prepareReroute()
.add(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true))
.execute().actionGet().state();
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
ClusterHealthResponse healthResponse = client("node1").admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
assertThat(healthResponse.timedOut(), equalTo(false));
logger.info("--> get the state, verify shard 1 primary allocated");
state = client("node1").admin().cluster().prepareState().execute().actionGet().state();
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.STARTED));
logger.info("--> move shard 1 primary from node1 to node2");
state = client("node1").admin().cluster().prepareReroute()
.add(new MoveAllocationCommand(new ShardId("test", 0), "node1", "node2"))
.execute().actionGet().state();
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.RELOCATING));
assertThat(state.routingNodes().node(state.nodes().resolveNode("node2").id()).shards().get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
healthResponse = client("node1").admin().cluster().prepareHealth().setWaitForYellowStatus().setWaitForRelocatingShards(0).execute().actionGet();
assertThat(healthResponse.timedOut(), equalTo(false));
logger.info("--> get the state, verify shard 1 primary moved from node1 to node2");
state = client("node1").admin().cluster().prepareState().execute().actionGet().state();
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
assertThat(state.routingNodes().node(state.nodes().resolveNode("node2").id()).shards().get(0).state(), equalTo(ShardRoutingState.STARTED));
}
@Test
public void rerouteWithAllocateLocalGateway() throws Exception {
Settings commonSettings = settingsBuilder()
.put("cluster.routing.allocation.disable_allocation", true)
.put("gateway.type", "local")
.build();
// clean three nodes
logger.info("--> cleaning nodes");
buildNode("node1", settingsBuilder().put("gateway.type", "local").build());
buildNode("node2", settingsBuilder().put("gateway.type", "local").build());
cleanAndCloseNodes();
logger.info("--> starting 2 nodes");
startNode("node1", commonSettings);
startNode("node2", commonSettings);
logger.info("--> create an index with 1 shard, 1 replica, nothing should allocate");
client("node1").admin().indices().prepareCreate("test")
.setSettings(settingsBuilder().put("index.number_of_shards", 1))
.execute().actionGet();
ClusterState state = client("node1").admin().cluster().prepareState().execute().actionGet().state();
assertThat(state.routingNodes().unassigned().size(), equalTo(2));
logger.info("--> explicitly allocate shard 1, actually allocating, no dry run");
state = client("node1").admin().cluster().prepareReroute()
.add(new AllocateAllocationCommand(new ShardId("test", 0), "node1", true))
.execute().actionGet().state();
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.INITIALIZING));
ClusterHealthResponse healthResponse = client("node1").admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
assertThat(healthResponse.timedOut(), equalTo(false));
logger.info("--> get the state, verify shard 1 primary allocated");
state = client("node1").admin().cluster().prepareState().execute().actionGet().state();
assertThat(state.routingNodes().unassigned().size(), equalTo(1));
assertThat(state.routingNodes().node(state.nodes().resolveNode("node1").id()).shards().get(0).state(), equalTo(ShardRoutingState.STARTED));
}
}

View File

@ -0,0 +1,75 @@
/*
* Licensed to ElasticSearch and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. ElasticSearch licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.test.unit.cluster.routing.allocation;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.testng.annotations.Test;
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
import static org.elasticsearch.cluster.metadata.IndexMetaData.newIndexMetaDataBuilder;
import static org.elasticsearch.cluster.metadata.MetaData.newMetaDataBuilder;
import static org.elasticsearch.cluster.node.DiscoveryNodes.newNodesBuilder;
import static org.elasticsearch.cluster.routing.RoutingBuilders.indexRoutingTable;
import static org.elasticsearch.cluster.routing.RoutingBuilders.routingTable;
import static org.elasticsearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.unit.cluster.routing.allocation.RoutingAllocationTests.newNode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
/**
*/
@Test
public class AllocatePostApiFlagTests {
private final ESLogger logger = Loggers.getLogger(AllocatePostApiFlagTests.class);
@Test
public void simpleFlagTests() {
AllocationService allocation = new AllocationService(settingsBuilder().put("cluster.routing.allocation.concurrent_recoveries", 10).build());
logger.info("creating an index with 1 shard, no replica");
MetaData metaData = newMetaDataBuilder()
.put(newIndexMetaDataBuilder("test").numberOfShards(1).numberOfReplicas(0))
.build();
RoutingTable routingTable = routingTable()
.add(indexRoutingTable("test").initializeEmpty(metaData.index("test")))
.build();
ClusterState clusterState = newClusterStateBuilder().metaData(metaData).routingTable(routingTable).build();
assertThat(clusterState.routingTable().index("test").shard(0).allocatedPostApi(), equalTo(false));
logger.info("adding two nodes and performing rerouting");
clusterState = newClusterStateBuilder().state(clusterState).nodes(newNodesBuilder().put(newNode("node1")).put(newNode("node2"))).build();
RoutingAllocation.Result rerouteResult = allocation.reroute(clusterState);
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingTable().index("test").shard(0).allocatedPostApi(), equalTo(false));
logger.info("start primary shard");
rerouteResult = allocation.applyStartedShards(clusterState, clusterState.routingNodes().shardsWithState(INITIALIZING));
clusterState = newClusterStateBuilder().state(clusterState).routingTable(rerouteResult.routingTable()).build();
assertThat(clusterState.routingTable().index("test").shard(0).allocatedPostApi(), equalTo(true));
}
}

View File

@ -35,6 +35,7 @@ import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.logging.ESLogger;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.shard.ShardId;
import org.testng.annotations.Test;
@ -184,7 +185,7 @@ public class AllocationCommandsTests {
}
@Test
public void cacnelCommand() {
public void cancelCommand() {
AllocationService allocation = new AllocationService(settingsBuilder()
.put("cluster.routing.allocation.disable_allocation", true)
.build());
@ -322,7 +323,11 @@ public class AllocationCommandsTests {
" ,{\"cancel\" : {\"index\" : \"test\", \"shard\" : 4, \"node\" : \"node5\"}} \n" +
" ]\n" +
"}\n";
AllocationCommands sCommands = AllocationCommands.fromXContent(XContentFactory.xContent(XContentType.JSON).createParser(commands));
XContentParser parser = XContentFactory.xContent(XContentType.JSON).createParser(commands);
// move two tokens, parser expected to be "on" `commands` field
parser.nextToken();
parser.nextToken();
AllocationCommands sCommands = AllocationCommands.fromXContent(parser);
assertThat(sCommands.commands().size(), equalTo(3));
assertThat(((AllocateAllocationCommand) (sCommands.commands().get(0))).shardId(), equalTo(new ShardId("test", 1)));