Added support for acknowledgements to cluster reroute api

Closes #3985
This commit is contained in:
Luca Cavanna 2013-10-27 00:00:17 +02:00
parent f23a1e23b1
commit d10bd2495d
7 changed files with 190 additions and 41 deletions

View File

@ -20,8 +20,9 @@
package org.elasticsearch.action.admin.cluster.reroute;
import org.elasticsearch.ElasticSearchParseException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.AcknowledgedRequest;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommands;
import org.elasticsearch.common.bytes.BytesReference;
@ -33,8 +34,9 @@ import org.elasticsearch.common.xcontent.XContentParser;
import java.io.IOException;
/**
* Request to submit cluster reroute allocation commands
*/
public class ClusterRerouteRequest extends MasterNodeOperationRequest<ClusterRerouteRequest> {
public class ClusterRerouteRequest extends AcknowledgedRequest<ClusterRerouteRequest> {
AllocationCommands commands = new AllocationCommands();
boolean dryRun;
@ -60,6 +62,10 @@ public class ClusterRerouteRequest extends MasterNodeOperationRequest<ClusterRer
return this;
}
/**
* Returns the current dry run flag which allows to run the commands without actually applying them,
* just to get back the resulting cluster state back.
*/
public boolean dryRun() {
return this.dryRun;
}
@ -105,6 +111,7 @@ public class ClusterRerouteRequest extends MasterNodeOperationRequest<ClusterRer
super.readFrom(in);
commands = AllocationCommands.readFrom(in);
dryRun = in.readBoolean();
readTimeout(in, Version.V_0_90_6);
}
@Override
@ -112,5 +119,6 @@ public class ClusterRerouteRequest extends MasterNodeOperationRequest<ClusterRer
super.writeTo(out);
AllocationCommands.writeTo(commands, out);
out.writeBoolean(dryRun);
writeTimeout(out, Version.V_0_90_6);
}
}

View File

@ -20,15 +20,16 @@
package org.elasticsearch.action.admin.cluster.reroute;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.MasterNodeOperationRequestBuilder;
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.internal.InternalClusterAdminClient;
import org.elasticsearch.cluster.routing.allocation.command.AllocationCommand;
import org.elasticsearch.common.bytes.BytesReference;
/**
* Builder for a cluster reroute request
*/
public class ClusterRerouteRequestBuilder extends MasterNodeOperationRequestBuilder<ClusterRerouteRequest, ClusterRerouteResponse, ClusterRerouteRequestBuilder> {
public class ClusterRerouteRequestBuilder extends AcknowledgedRequestBuilder<ClusterRerouteRequest, ClusterRerouteResponse, ClusterRerouteRequestBuilder> {
public ClusterRerouteRequestBuilder(ClusterAdminClient clusterClient) {
super((InternalClusterAdminClient) clusterClient, new ClusterRerouteRequest());
@ -52,6 +53,9 @@ public class ClusterRerouteRequestBuilder extends MasterNodeOperationRequestBuil
return this;
}
/**
* Sets the source for the request
*/
public ClusterRerouteRequestBuilder setSource(BytesReference source) throws Exception {
request.source(source);
return this;

View File

@ -19,7 +19,8 @@
package org.elasticsearch.action.admin.cluster.reroute;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.Version;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -27,8 +28,9 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import java.io.IOException;
/**
* Response returned after a cluster reroute request
*/
public class ClusterRerouteResponse extends ActionResponse {
public class ClusterRerouteResponse extends AcknowledgedResponse {
private ClusterState state;
@ -36,10 +38,14 @@ public class ClusterRerouteResponse extends ActionResponse {
}
ClusterRerouteResponse(ClusterState state) {
ClusterRerouteResponse(boolean acknowledged, ClusterState state) {
super(acknowledged);
this.state = state;
}
/**
* Returns the cluster state resulted from the cluster reroute request execution
*/
public ClusterState getState() {
return this.state;
}
@ -48,11 +54,13 @@ public class ClusterRerouteResponse extends ActionResponse {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
state = ClusterState.Builder.readFrom(in, null);
readAcknowledged(in, Version.V_0_90_6);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
ClusterState.Builder.writeTo(state, out);
writeAcknowledged(out, Version.V_0_90_6);
}
}

View File

@ -22,11 +22,13 @@ package org.elasticsearch.action.admin.cluster.reroute;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.TimeoutClusterStateUpdateTask;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.cluster.routing.allocation.RoutingAllocation;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
@ -72,10 +74,30 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
@Override
protected void masterOperation(final ClusterRerouteRequest request, final ClusterState state, final ActionListener<ClusterRerouteResponse> listener) throws ElasticSearchException {
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new TimeoutClusterStateUpdateTask() {
clusterService.submitStateUpdateTask("cluster_reroute (api)", Priority.URGENT, new AckedClusterStateUpdateTask() {
private volatile ClusterState clusterStateToSend;
@Override
public boolean mustAck(DiscoveryNode discoveryNode) {
return true;
}
@Override
public void onAllNodesAcked(@Nullable Throwable t) {
listener.onResponse(new ClusterRerouteResponse(true, clusterStateToSend));
}
@Override
public void onAckTimeout() {
listener.onResponse(new ClusterRerouteResponse(false, clusterStateToSend));
}
@Override
public TimeValue ackTimeout() {
return request.timeout();
}
@Override
public TimeValue timeout() {
return request.masterNodeTimeout();
@ -100,7 +122,7 @@ public class TransportClusterRerouteAction extends TransportMasterNodeOperationA
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
listener.onResponse(new ClusterRerouteResponse(clusterStateToSend));
}
});
}

View File

@ -29,7 +29,7 @@ import static org.elasticsearch.rest.RestStatus.OK;
/**
*/
public final class AcknowledgedRestResponseActionListener<T extends AcknowledgedResponse> extends AbstractRestResponseActionListener<T> {
public class AcknowledgedRestResponseActionListener<T extends AcknowledgedResponse> extends AbstractRestResponseActionListener<T> {
public AcknowledgedRestResponseActionListener(RestRequest request, RestChannel channel, ESLogger logger) {
super(request, channel, logger);
@ -42,10 +42,19 @@ public final class AcknowledgedRestResponseActionListener<T extends Acknowledged
builder.startObject()
.field("ok", true)
.field("acknowledged", response.isAcknowledged());
addCustomFields(builder, response);
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, OK, builder));
} catch (IOException e) {
onFailure(e);
}
}
/**
* Adds api specific fields to the rest response
* Does nothing by default but can be overridden by subclasses
*/
protected void addCustomFields(XContentBuilder builder, T response) throws IOException {
}
}

View File

@ -19,7 +19,6 @@
package org.elasticsearch.rest.action.admin.cluster.reroute;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteRequest;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.client.Client;
@ -29,7 +28,6 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestXContentBuilder;
import java.io.IOException;
@ -52,6 +50,7 @@ public class RestClusterRerouteAction extends BaseRestHandler {
final ClusterRerouteRequest clusterRerouteRequest = Requests.clusterRerouteRequest();
clusterRerouteRequest.listenerThreaded(false);
clusterRerouteRequest.dryRun(request.paramAsBoolean("dry_run", clusterRerouteRequest.dryRun()));
clusterRerouteRequest.timeout(request.paramAsTime("timeout", clusterRerouteRequest.timeout()));
clusterRerouteRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterRerouteRequest.masterNodeTimeout()));
if (request.hasContent()) {
try {
@ -65,14 +64,10 @@ public class RestClusterRerouteAction extends BaseRestHandler {
return;
}
}
client.admin().cluster().reroute(clusterRerouteRequest, new ActionListener<ClusterRerouteResponse>() {
@Override
public void onResponse(ClusterRerouteResponse response) {
try {
XContentBuilder builder = RestXContentBuilder.restContentBuilder(request);
builder.startObject();
builder.field("ok", true);
client.admin().cluster().reroute(clusterRerouteRequest, new AcknowledgedRestResponseActionListener<ClusterRerouteResponse>(request, channel, logger) {
@Override
protected void addCustomFields(XContentBuilder builder, ClusterRerouteResponse response) throws IOException {
builder.startObject("state");
// by default, filter metadata
if (request.param("filter_metadata") == null) {
@ -80,12 +75,6 @@ public class RestClusterRerouteAction extends BaseRestHandler {
}
response.getState().settingsFilter(settingsFilter).toXContent(builder, request);
builder.endObject();
builder.endObject();
channel.sendResponse(new XContentRestResponse(request, RestStatus.OK, builder));
} catch (Throwable e) {
onFailure(e);
}
}
@Override
@ -93,11 +82,7 @@ public class RestClusterRerouteAction extends BaseRestHandler {
if (logger.isDebugEnabled()) {
logger.debug("failed to handle cluster reroute", e);
}
try {
channel.sendResponse(new XContentThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
super.onFailure(e);
}
});
}

View File

@ -20,6 +20,10 @@
package org.elasticsearch.cluster.ack;
import com.google.common.collect.ImmutableList;
import org.elasticsearch.action.admin.cluster.node.info.NodeInfo;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.reroute.ClusterRerouteResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.delete.DeleteMappingResponse;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
@ -28,6 +32,12 @@ import org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse
import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.MutableShardRouting;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.ShardRoutingState;
import org.elasticsearch.cluster.routing.allocation.command.MoveAllocationCommand;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.query.QueryBuilders;
@ -37,10 +47,10 @@ import org.junit.Test;
import java.util.Map;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.test.AbstractIntegrationTest.ClusterScope;
import static org.elasticsearch.test.AbstractIntegrationTest.Scope.*;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
import static org.elasticsearch.test.AbstractIntegrationTest.Scope.SUITE;
import static org.hamcrest.Matchers.*;
@ClusterScope(scope = SUITE)
public class AckTests extends AbstractIntegrationTest {
@ -121,8 +131,111 @@ public class AckTests extends AbstractIntegrationTest {
assertThat(deleteMappingResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
getMappingsResponse = client.admin().indices().prepareGetMappings("test").addTypes("type1").get();
getMappingsResponse = client.admin().indices().prepareGetMappings("test").addTypes("type1").setLocal(true).get();
assertThat(getMappingsResponse.mappings().size(), equalTo(0));
}
}
@Test
public void testClusterRerouteAcknowledgement() throws InterruptedException {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_replicas", 0)).get();
ensureGreen();
MoveAllocationCommand moveAllocationCommand = getAllocationCommand();
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().add(moveAllocationCommand).get();
assertThat(clusterRerouteResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).get();
RoutingNode routingNode = clusterStateResponse.getState().routingNodes().nodesToShards().get(moveAllocationCommand.fromNode());
for (MutableShardRouting mutableShardRouting : routingNode) {
//if the shard that we wanted to move is still on the same node, it must be relocating
if (mutableShardRouting.shardId().equals(moveAllocationCommand.shardId())) {
assertThat(mutableShardRouting.relocating(), equalTo(true));
}
}
routingNode = clusterStateResponse.getState().routingNodes().nodesToShards().get(moveAllocationCommand.toNode());
boolean found = false;
for (MutableShardRouting mutableShardRouting : routingNode) {
if (mutableShardRouting.shardId().equals(moveAllocationCommand.shardId())) {
assertThat(mutableShardRouting.state(), anyOf(equalTo(ShardRoutingState.INITIALIZING), equalTo(ShardRoutingState.STARTED)));
found = true;
break;
}
}
assertThat(found, equalTo(true));
}
//let's wait for the relocation to be completed, otherwise there can be issues with after test checks (mock directory wrapper etc.)
waitForRelocation();
}
@Test
public void testClusterRerouteAcknowledgementDryRun() throws InterruptedException {
client().admin().indices().prepareCreate("test")
.setSettings(settingsBuilder()
.put("number_of_shards", atLeast(cluster().numNodes()))
.put("number_of_replicas", 0)).get();
ensureGreen();
MoveAllocationCommand moveAllocationCommand = getAllocationCommand();
ClusterRerouteResponse clusterRerouteResponse = client().admin().cluster().prepareReroute().setDryRun(true).add(moveAllocationCommand).get();
assertThat(clusterRerouteResponse.isAcknowledged(), equalTo(true));
for (Client client : clients()) {
ClusterStateResponse clusterStateResponse = client.admin().cluster().prepareState().setLocal(true).get();
RoutingNode routingNode = clusterStateResponse.getState().routingNodes().nodesToShards().get(moveAllocationCommand.fromNode());
for (MutableShardRouting mutableShardRouting : routingNode) {
//the shard that we wanted to move is still on the same node, as we had dryRun flag
if (mutableShardRouting.shardId().equals(moveAllocationCommand.shardId())) {
assertThat(mutableShardRouting.started(), equalTo(true));
}
}
routingNode = clusterStateResponse.getState().routingNodes().nodesToShards().get(moveAllocationCommand.toNode());
boolean found = false;
for (MutableShardRouting mutableShardRouting : routingNode) {
if (mutableShardRouting.shardId().equals(moveAllocationCommand.shardId())) {
found = true;
break;
}
}
assertThat(found, equalTo(false));
}
}
private static MoveAllocationCommand getAllocationCommand() {
String fromNodeId = null;
String toNodeId = null;
MutableShardRouting shardToBeMoved = null;
ClusterStateResponse clusterStateResponse = client().admin().cluster().prepareState().get();
for (RoutingNode routingNode : clusterStateResponse.getState().routingNodes().nodesToShards().values()) {
if (routingNode.node().isDataNode()) {
if (fromNodeId == null && routingNode.numberOfOwningShards() > 0) {
fromNodeId = routingNode.nodeId();
shardToBeMoved = routingNode.shards().get(0);
} else {
toNodeId = routingNode.nodeId();
}
if (toNodeId != null && fromNodeId != null) {
break;
}
}
}
assert fromNodeId != null;
assert toNodeId != null;
assert shardToBeMoved != null;
return new MoveAllocationCommand(shardToBeMoved.shardId(), fromNodeId, toNodeId);
}
}