diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteRequest.java b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteRequest.java index 7c3fa091735..f2f32d9eb2e 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteRequest.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteRequest.java @@ -20,8 +20,9 @@ package org.elasticsearch.action.admin.cluster.reroute; import org.elasticsearch.ElasticSearchParseException; +import org.elasticsearch.Version; import org.elasticsearch.action.ActionRequestValidationException; -import org.elasticsearch.action.support.master.MasterNodeOperationRequest; +import org.elasticsearch.action.support.master.AcknowledgedRequest; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand; import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands; import org.elasticsearch.common.bytes.BytesReference; @@ -33,8 +34,9 @@ import org.elasticsearch.common.xcontent.XContentParser; import java.io.IOException; /** + * Request to submit cluster reroute allocation commands */ -public class ClusterRerouteRequest extends MasterNodeOperationRequest { +public class ClusterRerouteRequest extends AcknowledgedRequest { AllocationCommands commands = new AllocationCommands(); boolean dryRun; @@ -60,6 +62,10 @@ public class ClusterRerouteRequest extends MasterNodeOperationRequest { +public class ClusterRerouteRequestBuilder extends AcknowledgedRequestBuilder { public ClusterRerouteRequestBuilder(ClusterAdminClient clusterClient) { super((InternalClusterAdminClient) clusterClient, new ClusterRerouteRequest()); @@ -52,6 +53,9 @@ public class ClusterRerouteRequestBuilder extends MasterNodeOperationRequestBuil return this; } + /** + * Sets the source for the request + */ public ClusterRerouteRequestBuilder setSource(BytesReference source) throws Exception { request.source(source); return this; diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java index 07306efea2a..d022e0f34bf 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/ClusterRerouteResponse.java @@ -19,7 +19,8 @@ package org.elasticsearch.action.admin.cluster.reroute; -import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.Version; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -27,8 +28,9 @@ import org.elasticsearch.common.io.stream.StreamOutput; import java.io.IOException; /** + * Response returned after a cluster reroute request */ -public class ClusterRerouteResponse extends ActionResponse { +public class ClusterRerouteResponse extends AcknowledgedResponse { private ClusterState state; @@ -36,10 +38,14 @@ public class ClusterRerouteResponse extends ActionResponse { } - ClusterRerouteResponse(ClusterState state) { + ClusterRerouteResponse(boolean acknowledged, ClusterState state) { + super(acknowledged); this.state = state; } + /** + * Returns the cluster state resulted from the cluster reroute request execution + */ public ClusterState getState() { return this.state; } @@ -48,11 +54,13 @@ public class ClusterRerouteResponse extends ActionResponse { public void readFrom(StreamInput in) throws IOException { super.readFrom(in); state = ClusterState.Builder.readFrom(in, null); + readAcknowledged(in, Version.V_0_90_6); } @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); ClusterState.Builder.writeTo(state, out); + writeAcknowledged(out, Version.V_0_90_6); } } diff --git a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java index c3977455b20..cb52abedd68 100644 --- a/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java +++ b/src/main/java/org/elasticsearch/action/admin/cluster/reroute/TransportClusterRerouteAction.java @@ -22,11 +22,13 @@ package org.elasticsearch.action.admin.cluster.reroute; import org.elasticsearch.ElasticSearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; +import org.elasticsearch.cluster.AckedClusterStateUpdateTask; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask; +import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.allocation.AllocationService; import org.elasticsearch.cluster.routing.allocation.RoutingAllocation; +import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Priority; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -72,10 +74,30 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA @Override protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener listener) throws ElasticSearchException { - clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new TimeoutClusterStateUpdateTask() { + clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new AckedClusterStateUpdateTask() { private volatile ClusterState clusterStateToSend; + @Override + public boolean mustAck(DiscoveryNode discoveryNode) { + return true; + } + + @Override + public void onAllNodesAcked(@Nullable Throwable t) { + listener.onResponse(new ClusterRerouteResponse(true, clusterStateToSend)); + } + + @Override + public void onAckTimeout() { + listener.onResponse(new ClusterRerouteResponse(false, clusterStateToSend)); + } + + @Override + public TimeValue ackTimeout() { + return request.timeout(); + } + @Override public TimeValue timeout() { return request.masterNodeTimeout(); @@ -100,7 +122,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { - listener.onResponse(new ClusterRerouteResponse(clusterStateToSend)); + } }); } diff --git a/src/main/java/org/elasticsearch/rest/AcknowledgedRestResponseActionListener.java b/src/main/java/org/elasticsearch/rest/AcknowledgedRestResponseActionListener.java index 67e95257db5..1eab161cad3 100644 --- a/src/main/java/org/elasticsearch/rest/AcknowledgedRestResponseActionListener.java +++ b/src/main/java/org/elasticsearch/rest/AcknowledgedRestResponseActionListener.java @@ -29,7 +29,7 @@ import static org.elasticsearch.rest.RestStatus.OK; /** */ -public final class AcknowledgedRestResponseActionListener extends AbstractRestResponseActionListener { +public class AcknowledgedRestResponseActionListener extends AbstractRestResponseActionListener { public AcknowledgedRestResponseActionListener(RestRequest request, RestChannel channel, ESLogger logger) { super(request, channel, logger); @@ -42,10 +42,19 @@ public final class AcknowledgedRestResponseActionListener() { + + client.admin().cluster().reroute(clusterRerouteRequest, new AcknowledgedRestResponseActionListener(request, channel, logger) { @Override - public void onResponse(ClusterRerouteResponse response) { - try { - XContentBuilder builder = RestXContentBuilder.restContentBuilder(request); - builder.startObject(); - - builder.field("ok", true); - builder.startObject("state"); - // by default, filter metadata - if (request.param("filter_metadata") == null) { - request.params().put("filter_metadata", "true"); - } - response.getState().settingsFilter(settingsFilter).toXContent(builder, request); - builder.endObject(); - - builder.endObject(); - channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder)); - } catch (Throwable e) { - onFailure(e); + protected void addCustomFields(XContentBuilder builder, ClusterRerouteResponse response) throws IOException { + builder.startObject("state"); + // by default, filter metadata + if (request.param("filter_metadata") == null) { + request.params().put("filter_metadata", "true"); } + response.getState().settingsFilter(settingsFilter).toXContent(builder, request); + builder.endObject(); } @Override @@ -93,11 +82,7 @@ public class RestClusterRerouteAction extends BaseRestHandler { if (logger.isDebugEnabled()) { logger.debug("failed to handle cluster reroute", e); } - try { - channel.sendResponse(new XContentThrowableRestResponse(request, e)); - } catch (IOException e1) { - logger.error("Failed to send failure response", e1); - } + super.onFailure(e); } }); } diff --git a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java index 36d4525b3a4..b2dc9332e97 100644 --- a/src/test/java/org/elasticsearch/cluster/ack/AckTests.java +++ b/src/test/java/org/elasticsearch/cluster/ack/AckTests.java @@ -20,6 +20,10 @@ package org.elasticsearch.cluster.ack; import com.google.common.collect.ImmutableList; +import org.elasticsearch.action.admin.cluster.node.info.NodeInfo; +import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse; +import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse; +import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; @@ -28,6 +32,12 @@ import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse; import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse; import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.MutableShardRouting; +import org.elasticsearch.cluster.routing.RoutingNode; +import org.elasticsearch.cluster.routing.ShardRoutingState; +import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.index.query.QueryBuilders; @@ -37,10 +47,10 @@ import org.junit.Test; import java.util.Map; +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope; -import static org.elasticsearch.test.AbstractIntegrationTest.Scope.*; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.notNullValue; +import static org.elasticsearch.test.AbstractIntegrationTest.Scope.SUITE; +import static org.hamcrest.Matchers.*; @ClusterScope(scope = SUITE) public class AckTests extends AbstractIntegrationTest { @@ -121,8 +131,111 @@ public class AckTests extends AbstractIntegrationTest { assertThat(deleteMappingResponse.isAcknowledged(), equalTo(true)); for (Client client : clients()) { - getMappingsResponse = client.admin().indices().prepareGetMappings("test").addTypes("type1").get(); + getMappingsResponse = client.admin().indices().prepareGetMappings("test").addTypes("type1").setLocal(true).get(); assertThat(getMappingsResponse.mappings().size(), equalTo(0)); } } + + @Test + public void testClusterRerouteAcknowledgement() throws InterruptedException { + client().admin().indices().prepareCreate("test") + .setSettings(settingsBuilder() + .put("number_of_shards", atLeast(cluster().numNodes())) + .put("number_of_replicas", 0)).get(); + ensureGreen(); + + + MoveAllocationCommand moveAllocationCommand = getAllocationCommand(); + + ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().add(moveAllocationCommand).get(); + assertThat(clusterRerouteResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).get(); + RoutingNode routingNode = clusterStateResponse.getState().routingNodes().nodesToShards().get(moveAllocationCommand.fromNode()); + for (MutableShardRouting mutableShardRouting : routingNode) { + //if the shard that we wanted to move is still on the same node, it must be relocating + if (mutableShardRouting.shardId().equals(moveAllocationCommand.shardId())) { + assertThat(mutableShardRouting.relocating(), equalTo(true)); + } + + } + + routingNode = clusterStateResponse.getState().routingNodes().nodesToShards().get(moveAllocationCommand.toNode()); + boolean found = false; + for (MutableShardRouting mutableShardRouting : routingNode) { + if (mutableShardRouting.shardId().equals(moveAllocationCommand.shardId())) { + assertThat(mutableShardRouting.state(), anyOf(equalTo(ShardRoutingState.INITIALIZING), equalTo(ShardRoutingState.STARTED))); + found = true; + break; + } + } + assertThat(found, equalTo(true)); + } + //let's wait for the relocation to be completed, otherwise there can be issues with after test checks (mock directory wrapper etc.) + waitForRelocation(); + } + + @Test + public void testClusterRerouteAcknowledgementDryRun() throws InterruptedException { + client().admin().indices().prepareCreate("test") + .setSettings(settingsBuilder() + .put("number_of_shards", atLeast(cluster().numNodes())) + .put("number_of_replicas", 0)).get(); + ensureGreen(); + + MoveAllocationCommand moveAllocationCommand = getAllocationCommand(); + + ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setDryRun(true).add(moveAllocationCommand).get(); + assertThat(clusterRerouteResponse.isAcknowledged(), equalTo(true)); + + for (Client client : clients()) { + ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).get(); + RoutingNode routingNode = clusterStateResponse.getState().routingNodes().nodesToShards().get(moveAllocationCommand.fromNode()); + for (MutableShardRouting mutableShardRouting : routingNode) { + //the shard that we wanted to move is still on the same node, as we had dryRun flag + if (mutableShardRouting.shardId().equals(moveAllocationCommand.shardId())) { + assertThat(mutableShardRouting.started(), equalTo(true)); + } + + } + + routingNode = clusterStateResponse.getState().routingNodes().nodesToShards().get(moveAllocationCommand.toNode()); + boolean found = false; + for (MutableShardRouting mutableShardRouting : routingNode) { + if (mutableShardRouting.shardId().equals(moveAllocationCommand.shardId())) { + found = true; + break; + } + } + assertThat(found, equalTo(false)); + } + } + + private static MoveAllocationCommand getAllocationCommand() { + String fromNodeId = null; + String toNodeId = null; + MutableShardRouting shardToBeMoved = null; + ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get(); + for (RoutingNode routingNode : clusterStateResponse.getState().routingNodes().nodesToShards().values()) { + if (routingNode.node().isDataNode()) { + if (fromNodeId == null && routingNode.numberOfOwningShards() > 0) { + fromNodeId = routingNode.nodeId(); + shardToBeMoved = routingNode.shards().get(0); + } else { + toNodeId = routingNode.nodeId(); + } + + if (toNodeId != null && fromNodeId != null) { + break; + } + } + } + + assert fromNodeId != null; + assert toNodeId != null; + assert shardToBeMoved != null; + + return new MoveAllocationCommand(shardToBeMoved.shardId(), fromNodeId, toNodeId); + } }