Shutdown API: Improve behavior when shutting down the whole cluster, closes #250.

This commit is contained in:
kimchy 2010-07-11 20:41:58 +03:00
parent 09493691a7
commit 294f09a1d7
14 changed files with 309 additions and 164 deletions

View File

@ -19,6 +19,9 @@
package org.elasticsearch.action; package org.elasticsearch.action;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
@ -31,4 +34,31 @@ public class Actions {
validationException.addValidationError(error); validationException.addValidationError(error);
return validationException; return validationException;
} }
public static boolean isAllNodes(String... nodesIds) {
return nodesIds == null || nodesIds.length == 0 || (nodesIds.length == 1 && nodesIds[0].equals("_all"));
}
public static String[] buildNodesIds(DiscoveryNodes nodes, String... nodesIds) {
if (isAllNodes(nodesIds)) {
int index = 0;
nodesIds = new String[nodes.size()];
for (DiscoveryNode node : nodes) {
nodesIds[index++] = node.id();
}
return nodesIds;
} else {
String[] resolvedNodesIds = new String[nodesIds.length];
for (int i = 0; i < nodesIds.length; i++) {
if (nodesIds[i].equals("_local")) {
resolvedNodesIds[i] = nodes.localNodeId();
} else if (nodesIds[i].equals("_master")) {
resolvedNodesIds[i] = nodes.masterNodeId();
} else {
resolvedNodesIds[i] = nodesIds[i];
}
}
return resolvedNodesIds;
}
}
} }

View File

@ -64,7 +64,7 @@ public class TransportClusterHealthAction extends TransportMasterNodeOperationAc
return new ClusterHealthResponse(); return new ClusterHealthResponse();
} }
@Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request) throws ElasticSearchException { @Override protected ClusterHealthResponse masterOperation(ClusterHealthRequest request, ClusterState state) throws ElasticSearchException {
int waitFor = 3; int waitFor = 3;
if (request.waitForStatus() == null) { if (request.waitForStatus() == null) {
waitFor--; waitFor--;

View File

@ -19,7 +19,9 @@
package org.elasticsearch.action.admin.cluster.node.shutdown; package org.elasticsearch.action.admin.cluster.node.shutdown;
import org.elasticsearch.action.support.nodes.NodesOperationRequest; import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
@ -29,23 +31,24 @@ import java.io.IOException;
import static org.elasticsearch.common.unit.TimeValue.*; import static org.elasticsearch.common.unit.TimeValue.*;
/** /**
* A request to shutdown one ore more nodes (or the whole cluster).
*
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class NodesShutdownRequest extends NodesOperationRequest { public class NodesShutdownRequest extends MasterNodeOperationRequest {
String[] nodesIds = Strings.EMPTY_ARRAY;
TimeValue delay = TimeValue.timeValueSeconds(1); TimeValue delay = TimeValue.timeValueSeconds(1);
protected NodesShutdownRequest() { NodesShutdownRequest() {
} }
/**
* Shuts down nodes based on the nodes ids specified. If none are passed, <b>all</b>
* nodes will be shutdown.
*/
public NodesShutdownRequest(String... nodesIds) { public NodesShutdownRequest(String... nodesIds) {
super(nodesIds); this.nodesIds = nodesIds;
}
public NodesShutdownRequest nodesIds(String... nodesIds) {
this.nodesIds = nodesIds;
return this;
} }
/** /**
@ -56,6 +59,10 @@ public class NodesShutdownRequest extends NodesOperationRequest {
return this; return this;
} }
public TimeValue delay() {
return this.delay;
}
/** /**
* The delay for the shutdown to occur. Defaults to <tt>1s</tt>. * The delay for the shutdown to occur. Defaults to <tt>1s</tt>.
*/ */
@ -63,17 +70,32 @@ public class NodesShutdownRequest extends NodesOperationRequest {
return delay(TimeValue.parseTimeValue(delay, null)); return delay(TimeValue.parseTimeValue(delay, null));
} }
public TimeValue delay() { @Override public ActionRequestValidationException validate() {
return this.delay; return null;
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); super.readFrom(in);
delay = readTimeValue(in); delay = readTimeValue(in);
int size = in.readVInt();
if (size > 0) {
nodesIds = new String[size];
for (int i = 0; i < nodesIds.length; i++) {
nodesIds[i] = in.readUTF();
}
}
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); super.writeTo(out);
delay.writeTo(out); delay.writeTo(out);
if (nodesIds == null) {
out.writeVInt(0);
} else {
out.writeVInt(nodesIds.length);
for (String nodeId : nodesIds) {
out.writeUTF(nodeId);
}
}
} }
} }

View File

@ -19,8 +19,7 @@
package org.elasticsearch.action.admin.cluster.node.shutdown; package org.elasticsearch.action.admin.cluster.node.shutdown;
import org.elasticsearch.action.support.nodes.NodeOperationResponse; import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.nodes.NodesOperationResponse;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -31,44 +30,49 @@ import java.io.IOException;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class NodesShutdownResponse extends NodesOperationResponse<NodesShutdownResponse.NodeShutdownResponse> { public class NodesShutdownResponse implements ActionResponse {
private ClusterName clusterName;
private DiscoveryNode[] nodes;
NodesShutdownResponse() { NodesShutdownResponse() {
} }
public NodesShutdownResponse(ClusterName clusterName, NodeShutdownResponse[] nodes) { public NodesShutdownResponse(ClusterName clusterName, DiscoveryNode[] nodes) {
super(clusterName, nodes); this.clusterName = clusterName;
this.nodes = nodes;
}
public ClusterName clusterName() {
return this.clusterName;
}
public ClusterName getClusterName() {
return clusterName();
}
public DiscoveryNode[] nodes() {
return this.nodes;
}
public DiscoveryNode[] getNodes() {
return nodes();
} }
@Override public void readFrom(StreamInput in) throws IOException { @Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in); clusterName = ClusterName.readClusterName(in);
nodes = new NodeShutdownResponse[in.readVInt()]; nodes = new DiscoveryNode[in.readVInt()];
for (int i = 0; i < nodes.length; i++) { for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeShutdownResponse.readNodeShutdownResponse(in); nodes[i] = DiscoveryNode.readNode(in);
} }
} }
@Override public void writeTo(StreamOutput out) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out); clusterName.writeTo(out);
out.writeVInt(nodes.length); out.writeVInt(nodes.length);
for (NodeShutdownResponse node : nodes) { for (DiscoveryNode node : nodes) {
node.writeTo(out); node.writeTo(out);
} }
} }
}
public static class NodeShutdownResponse extends NodeOperationResponse {
NodeShutdownResponse() {
}
public NodeShutdownResponse(DiscoveryNode node) {
super(node);
}
public static NodeShutdownResponse readNodeShutdownResponse(StreamInput in) throws IOException {
NodeShutdownResponse res = new NodeShutdownResponse();
res.readFrom(in);
return res;
}
}
}

View File

@ -21,143 +21,236 @@ package org.elasticsearch.action.admin.cluster.node.shutdown;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException; import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.Actions;
import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.nodes.NodeOperationRequest; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService; import org.elasticsearch.transport.*;
import java.io.IOException; import java.util.Set;
import java.util.List; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.elasticsearch.common.collect.Lists.*;
import static org.elasticsearch.common.unit.TimeValue.*;
/** /**
* @author kimchy (shay.banon) * @author kimchy (shay.banon)
*/ */
public class TransportNodesShutdownAction extends TransportNodesOperationAction<NodesShutdownRequest, NodesShutdownResponse, TransportNodesShutdownAction.NodeShutdownRequest, NodesShutdownResponse.NodeShutdownResponse> { public class TransportNodesShutdownAction extends TransportMasterNodeOperationAction<NodesShutdownRequest, NodesShutdownResponse> {
private final Node node; private final Node node;
private final ClusterName clusterName;
private final boolean disabled; private final boolean disabled;
@Inject public TransportNodesShutdownAction(Settings settings, ClusterName clusterName, ThreadPool threadPool, private final TimeValue delay;
ClusterService clusterService, TransportService transportService,
Node node) { @Inject public TransportNodesShutdownAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
super(settings, clusterName, threadPool, clusterService, transportService); Node node, ClusterName clusterName) {
super(settings, transportService, clusterService, threadPool);
this.node = node; this.node = node;
disabled = componentSettings.getAsBoolean("disabled", false); this.clusterName = clusterName;
this.disabled = componentSettings.getAsBoolean("disabled", false);
this.delay = componentSettings.getAsTime("delay", TimeValue.timeValueMillis(200));
this.transportService.registerHandler(NodeShutdownRequestHandler.ACTION, new NodeShutdownRequestHandler());
} }
@Override protected String transportAction() { @Override protected String transportAction() {
return TransportActions.Admin.Cluster.Node.SHUTDOWN; return TransportActions.Admin.Cluster.Node.SHUTDOWN;
} }
@Override protected String transportNodeAction() {
return "/cluster/nodes/shutdown/node";
}
@Override protected NodesShutdownResponse newResponse(NodesShutdownRequest nodesShutdownRequest, AtomicReferenceArray responses) {
final List<NodesShutdownResponse.NodeShutdownResponse> nodeShutdownResponses = newArrayList();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodesShutdownResponse.NodeShutdownResponse) {
nodeShutdownResponses.add((NodesShutdownResponse.NodeShutdownResponse) resp);
}
}
return new NodesShutdownResponse(clusterName, nodeShutdownResponses.toArray(new NodesShutdownResponse.NodeShutdownResponse[nodeShutdownResponses.size()]));
}
@Override protected NodesShutdownRequest newRequest() { @Override protected NodesShutdownRequest newRequest() {
return new NodesShutdownRequest(); return new NodesShutdownRequest();
} }
@Override protected NodeShutdownRequest newNodeRequest() { @Override protected NodesShutdownResponse newResponse() {
return new NodeShutdownRequest(); return new NodesShutdownResponse();
} }
@Override protected NodeShutdownRequest newNodeRequest(String nodeId, NodesShutdownRequest request) { @Override protected void processBeforeDelegationToMaster(NodesShutdownRequest request, ClusterState state) {
return new NodeShutdownRequest(nodeId, request.delay); String[] nodesIds = request.nodesIds;
if (nodesIds != null) {
for (int i = 0; i < nodesIds.length; i++) {
// replace the _local one, since it looses its meaning when going over to the master...
if ("_local".equals(nodesIds[i])) {
nodesIds[i] = state.nodes().localNodeId();
}
}
}
} }
@Override protected NodesShutdownResponse.NodeShutdownResponse newNodeResponse() { @Override protected NodesShutdownResponse masterOperation(final NodesShutdownRequest request, final ClusterState state) throws ElasticSearchException {
return new NodesShutdownResponse.NodeShutdownResponse();
}
@Override protected NodesShutdownResponse.NodeShutdownResponse nodeOperation(final NodeShutdownRequest request) throws ElasticSearchException {
if (disabled) { if (disabled) {
throw new ElasticSearchIllegalStateException("Shutdown is disabled"); throw new ElasticSearchIllegalStateException("Shutdown is disabled");
} }
logger.info("Shutting down in [{}]", request.delay); Set<DiscoveryNode> nodes = Sets.newHashSet();
Thread t = new Thread(new Runnable() { if (Actions.isAllNodes(request.nodesIds)) {
@Override public void run() { logger.info("[cluster_shutdown]: requested, shutting down in [{}]", request.delay);
try { nodes.addAll(state.nodes().nodes().values());
Thread.sleep(request.delay.millis()); Thread t = new Thread(new Runnable() {
} catch (InterruptedException e) { @Override public void run() {
// ignore
}
boolean shutdownWithWrapper = false;
if (System.getProperty("elasticsearch-service") != null) {
try { try {
Class wrapperManager = settings.getClassLoader().loadClass("org.tanukisoftware.wrapper.WrapperManager"); Thread.sleep(request.delay.millis());
logger.info("Initiating requested shutdown (using service)"); } catch (InterruptedException e) {
wrapperManager.getMethod("stopAndReturn", int.class).invoke(null, 0); // ignore
shutdownWithWrapper = true; }
} catch (Throwable e) { // first, stop the cluster service
e.printStackTrace(); logger.trace("[cluster_shutdown]: stopping the cluster service so no re-routing will occur");
clusterService.stop();
final CountDownLatch latch = new CountDownLatch(state.nodes().size());
for (final DiscoveryNode node : state.nodes()) {
if (node.id().equals(state.nodes().masterNodeId())) {
// don't shutdown the master yet...
latch.countDown();
} else {
logger.trace("[cluster_shutdown]: sending shutdown request to [{}]", node);
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler() {
@Override public void handleResponse(VoidStreamable response) {
logger.trace("[cluster_shutdown]: received shutdown response from [{}]", node);
latch.countDown();
}
@Override public void handleException(RemoteTransportException exp) {
logger.warn("[cluster_shutdown]: received failed shutdown response from [{}]", exp, node);
latch.countDown();
}
});
}
} }
}
if (!shutdownWithWrapper) {
logger.info("Initiating requested shutdown");
try { try {
node.close(); latch.await();
} catch (Exception e) { } catch (InterruptedException e) {
logger.warn("Failed to shutdown", e); // ignore
} finally {
// make sure we initiate the shutdown hooks, so the Bootstrap#main thread will exit
System.exit(0);
} }
logger.info("[cluster_shutdown]: done shutting done all nodes except master, proceeding to master");
// now, kill the master
logger.trace("[cluster_shutdown]: shutting down the master [{}]", state.nodes().masterNode());
transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler() {
@Override public void handleResponse(VoidStreamable response) {
logger.trace("[cluster_shutdown]: received shutdown response from master");
}
@Override public void handleException(RemoteTransportException exp) {
logger.warn("[cluster_shutdown]: received failed shutdown response master", exp);
}
});
}
});
t.start();
} else {
final String[] nodesIds = Actions.buildNodesIds(state.nodes(), request.nodesIds);
logger.info("[partial_cluster_shutdown]: requested, shutting down [{}] in [{}]", nodesIds, request.delay);
for (String nodeId : nodesIds) {
final DiscoveryNode node = state.nodes().get(nodeId);
if (node != null) {
nodes.add(node);
} }
} }
});
t.start(); Thread t = new Thread(new Runnable() {
return new NodesShutdownResponse.NodeShutdownResponse(clusterService.state().nodes().localNode()); @Override public void run() {
try {
Thread.sleep(request.delay.millis());
} catch (InterruptedException e) {
// ignore
}
final CountDownLatch latch = new CountDownLatch(nodesIds.length);
for (String nodeId : nodesIds) {
final DiscoveryNode node = state.nodes().get(nodeId);
if (node == null) {
logger.warn("[partial_cluster_shutdown]: no node to shutdown for node_id [{}]", nodeId);
latch.countDown();
continue;
}
logger.trace("[partial_cluster_shutdown]: sending shutdown request to [{}]", node);
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler() {
@Override public void handleResponse(VoidStreamable response) {
logger.trace("[partial_cluster_shutdown]: received shutdown response from [{}]", node);
latch.countDown();
}
@Override public void handleException(RemoteTransportException exp) {
logger.warn("[partial_cluster_shutdown]: received failed shutdown response from [{}]", exp, node);
latch.countDown();
}
});
}
try {
latch.await();
} catch (InterruptedException e) {
// ignore
}
logger.info("[partial_cluster_shutdown]: done shutting down [{}]", nodesIds);
}
});
t.start();
}
return new NodesShutdownResponse(clusterName, nodes.toArray(new DiscoveryNode[nodes.size()]));
} }
@Override protected boolean accumulateExceptions() { private class NodeShutdownRequestHandler extends BaseTransportRequestHandler<VoidStreamable> {
return false;
}
protected static class NodeShutdownRequest extends NodeOperationRequest { static final String ACTION = "/cluster/nodes/shutdown/node";
TimeValue delay; @Override public VoidStreamable newInstance() {
return VoidStreamable.INSTANCE;
private NodeShutdownRequest() {
} }
private NodeShutdownRequest(String nodeId, TimeValue delay) { @Override public void messageReceived(VoidStreamable request, TransportChannel channel) throws Exception {
super(nodeId); if (disabled) {
this.delay = delay; throw new ElasticSearchIllegalStateException("Shutdown is disabled");
} }
logger.info("shutting down in [{}]", delay);
Thread t = new Thread(new Runnable() {
@Override public void run() {
try {
Thread.sleep(delay.millis());
} catch (InterruptedException e) {
// ignore
}
boolean shutdownWithWrapper = false;
if (System.getProperty("elasticsearch-service") != null) {
try {
Class wrapperManager = settings.getClassLoader().loadClass("org.tanukisoftware.wrapper.WrapperManager");
logger.info("initiating requested shutdown (using service)");
wrapperManager.getMethod("stopAndReturn", int.class).invoke(null, 0);
shutdownWithWrapper = true;
} catch (Throwable e) {
e.printStackTrace();
}
}
if (!shutdownWithWrapper) {
logger.info("initiating requested shutdown...");
try {
node.close();
} catch (Exception e) {
logger.warn("Failed to shutdown", e);
} finally {
// make sure we initiate the shutdown hooks, so the Bootstrap#main thread will exit
System.exit(0);
}
}
}
});
t.start();
@Override public void readFrom(StreamInput in) throws IOException { channel.sendResponse(VoidStreamable.INSTANCE);
super.readFrom(in);
delay = readTimeValue(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
delay.writeTo(out);
} }
} }
} }

View File

@ -62,7 +62,7 @@ public class TransportClusterStateAction extends TransportMasterNodeOperationAct
return new ClusterStateResponse(); return new ClusterStateResponse();
} }
@Override protected ClusterStateResponse masterOperation(ClusterStateRequest request) throws ElasticSearchException { @Override protected ClusterStateResponse masterOperation(ClusterStateRequest request, ClusterState state) throws ElasticSearchException {
ClusterState currentState = clusterService.state(); ClusterState currentState = clusterService.state();
ClusterState.Builder builder = newClusterStateBuilder(); ClusterState.Builder builder = newClusterStateBuilder();
if (!request.filterNodes()) { if (!request.filterNodes()) {

View File

@ -63,7 +63,7 @@ public class TransportIndicesAliasesAction extends TransportMasterNodeOperationA
} }
} }
@Override protected IndicesAliasesResponse masterOperation(IndicesAliasesRequest request) throws ElasticSearchException { @Override protected IndicesAliasesResponse masterOperation(IndicesAliasesRequest request, ClusterState state) throws ElasticSearchException {
MetaDataService.IndicesAliasesResult indicesAliasesResult = metaDataService.indicesAliases(request.aliasActions()); MetaDataService.IndicesAliasesResult indicesAliasesResult = metaDataService.indicesAliases(request.aliasActions());
return new IndicesAliasesResponse(); return new IndicesAliasesResponse();
} }

View File

@ -62,7 +62,7 @@ public class TransportCreateIndexAction extends TransportMasterNodeOperationActi
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, request.index()); state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, request.index());
} }
@Override protected CreateIndexResponse masterOperation(CreateIndexRequest request) throws ElasticSearchException { @Override protected CreateIndexResponse masterOperation(CreateIndexRequest request, ClusterState state) throws ElasticSearchException {
String cause = request.cause(); String cause = request.cause();
if (cause.length() == 0) { if (cause.length() == 0) {
cause = "api"; cause = "api";

View File

@ -62,7 +62,7 @@ public class TransportDeleteIndexAction extends TransportMasterNodeOperationActi
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, request.index()); state.blocks().indexBlockedRaiseException(ClusterBlockLevel.METADATA, request.index());
} }
@Override protected DeleteIndexResponse masterOperation(DeleteIndexRequest request) throws ElasticSearchException { @Override protected DeleteIndexResponse masterOperation(DeleteIndexRequest request, ClusterState state) throws ElasticSearchException {
MetaDataService.DeleteIndexResult deleteIndexResult = metaDataService.deleteIndex(request.index(), request.timeout()); MetaDataService.DeleteIndexResult deleteIndexResult = metaDataService.deleteIndex(request.index(), request.timeout());
return new DeleteIndexResponse(deleteIndexResult.acknowledged()); return new DeleteIndexResponse(deleteIndexResult.acknowledged());
} }

View File

@ -68,7 +68,7 @@ public class TransportPutMappingAction extends TransportMasterNodeOperationActio
} }
} }
@Override protected PutMappingResponse masterOperation(PutMappingRequest request) throws ElasticSearchException { @Override protected PutMappingResponse masterOperation(PutMappingRequest request, ClusterState state) throws ElasticSearchException {
ClusterState clusterState = clusterService.state(); ClusterState clusterState = clusterService.state();
// update to concrete indices // update to concrete indices

View File

@ -34,7 +34,7 @@ import org.elasticsearch.transport.*;
/** /**
* A base class for operations that needs to be performed on the master node. * A base class for operations that needs to be performed on the master node.
* *
* @author kimchy (Shay Banon) * @author kimchy (shay.banon)
*/ */
public abstract class TransportMasterNodeOperationAction<Request extends MasterNodeOperationRequest, Response extends ActionResponse> extends BaseAction<Request, Response> { public abstract class TransportMasterNodeOperationAction<Request extends MasterNodeOperationRequest, Response extends ActionResponse> extends BaseAction<Request, Response> {
@ -59,20 +59,25 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
protected abstract Response newResponse(); protected abstract Response newResponse();
protected abstract Response masterOperation(Request request) throws ElasticSearchException; protected abstract Response masterOperation(Request request, ClusterState state) throws ElasticSearchException;
protected void checkBlock(Request request, ClusterState state) { protected void checkBlock(Request request, ClusterState state) {
} }
protected void processBeforeDelegationToMaster(Request request, ClusterState state) {
}
@Override protected void doExecute(final Request request, final ActionListener<Response> listener) { @Override protected void doExecute(final Request request, final ActionListener<Response> listener) {
DiscoveryNodes nodes = clusterService.state().nodes(); final ClusterState clusterState = clusterService.state();
DiscoveryNodes nodes = clusterState.nodes();
if (nodes.localNodeMaster()) { if (nodes.localNodeMaster()) {
threadPool.execute(new Runnable() { threadPool.execute(new Runnable() {
@Override public void run() { @Override public void run() {
try { try {
checkBlock(request, clusterService.state()); checkBlock(request, clusterState);
Response response = masterOperation(request); Response response = masterOperation(request, clusterState);
listener.onResponse(response); listener.onResponse(response);
} catch (Exception e) { } catch (Exception e) {
listener.onFailure(e); listener.onFailure(e);
@ -83,6 +88,7 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
if (nodes.masterNode() == null) { if (nodes.masterNode() == null) {
throw new ElasticSearchIllegalStateException("No master node discovered or set"); throw new ElasticSearchIllegalStateException("No master node discovered or set");
} }
processBeforeDelegationToMaster(request, clusterState);
transportService.sendRequest(nodes.masterNode(), transportAction(), request, new BaseTransportResponseHandler<Response>() { transportService.sendRequest(nodes.masterNode(), transportAction(), request, new BaseTransportResponseHandler<Response>() {
@Override public Response newInstance() { @Override public Response newInstance() {
return newResponse(); return newResponse();
@ -106,9 +112,10 @@ public abstract class TransportMasterNodeOperationAction<Request extends MasterN
} }
@Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception { @Override public void messageReceived(final Request request, final TransportChannel channel) throws Exception {
if (clusterService.state().nodes().localNodeMaster()) { final ClusterState clusterState = clusterService.state();
checkBlock(request, clusterService.state()); if (clusterState.nodes().localNodeMaster()) {
Response response = masterOperation(request); checkBlock(request, clusterState);
Response response = masterOperation(request, clusterState);
channel.sendResponse(response); channel.sendResponse(response);
} else { } else {
transportService.sendRequest(clusterService.state().nodes().masterNode(), transportAction(), request, new BaseTransportResponseHandler<Response>() { transportService.sendRequest(clusterService.state().nodes().masterNode(), transportAction(), request, new BaseTransportResponseHandler<Response>() {

View File

@ -21,6 +21,7 @@ package org.elasticsearch.action.support.nodes;
import org.elasticsearch.ElasticSearchException; import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.Actions;
import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.NoSuchNodeException; import org.elasticsearch.action.NoSuchNodeException;
import org.elasticsearch.action.support.BaseAction; import org.elasticsearch.action.support.BaseAction;
@ -108,21 +109,7 @@ public abstract class TransportNodesOperationAction<Request extends NodesOperati
this.request = request; this.request = request;
this.listener = listener; this.listener = listener;
clusterState = clusterService.state(); clusterState = clusterService.state();
String[] nodesIds = request.nodesIds(); String[] nodesIds = Actions.buildNodesIds(clusterState.nodes(), request.nodesIds());
if (nodesIds == null || nodesIds.length == 0 || (nodesIds.length == 1 && nodesIds[0].equals("_all"))) {
int index = 0;
nodesIds = new String[clusterState.nodes().size()];
for (DiscoveryNode node : clusterState.nodes()) {
nodesIds[index++] = node.id();
}
}
for (int i = 0; i < nodesIds.length; i++) {
if (nodesIds[i].equals("_local")) {
nodesIds[i] = clusterState.nodes().localNodeId();
} else if (nodesIds[i].equals("_master")) {
nodesIds[i] = clusterState.nodes().masterNodeId();
}
}
this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds); this.nodesIds = filterNodeIds(clusterState.nodes(), nodesIds);
this.responses = new AtomicReferenceArray<Object>(this.nodesIds.length); this.responses = new AtomicReferenceArray<Object>(this.nodesIds.length);
} }

View File

@ -25,6 +25,7 @@ import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.MasterNodeOperationRequest; import org.elasticsearch.action.support.master.MasterNodeOperationRequest;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction; import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MetaDataService; import org.elasticsearch.cluster.metadata.MetaDataService;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
@ -63,7 +64,7 @@ public class MappingUpdatedAction extends TransportMasterNodeOperationAction<Map
return new MappingUpdatedResponse(); return new MappingUpdatedResponse();
} }
@Override protected MappingUpdatedResponse masterOperation(MappingUpdatedRequest request) throws ElasticSearchException { @Override protected MappingUpdatedResponse masterOperation(MappingUpdatedRequest request, ClusterState state) throws ElasticSearchException {
metaDataService.updateMapping(request.index(), request.type(), request.mappingSource()); metaDataService.updateMapping(request.index(), request.type(), request.mappingSource());
return new MappingUpdatedResponse(); return new MappingUpdatedResponse();
} }

View File

@ -23,6 +23,7 @@ import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest; import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse; import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.builder.XContentBuilder; import org.elasticsearch.common.xcontent.builder.XContentBuilder;
@ -51,16 +52,16 @@ public class RestNodesShutdownAction extends BaseRestHandler {
nodesShutdownRequest.listenerThreaded(false); nodesShutdownRequest.listenerThreaded(false);
nodesShutdownRequest.delay(request.paramAsTime("delay", nodesShutdownRequest.delay())); nodesShutdownRequest.delay(request.paramAsTime("delay", nodesShutdownRequest.delay()));
client.admin().cluster().nodesShutdown(nodesShutdownRequest, new ActionListener<NodesShutdownResponse>() { client.admin().cluster().nodesShutdown(nodesShutdownRequest, new ActionListener<NodesShutdownResponse>() {
@Override public void onResponse(NodesShutdownResponse result) { @Override public void onResponse(NodesShutdownResponse response) {
try { try {
XContentBuilder builder = restContentBuilder(request); XContentBuilder builder = restContentBuilder(request);
builder.startObject(); builder.startObject();
builder.field("cluster_name", result.clusterName().value()); builder.field("cluster_name", response.clusterName().value());
builder.startObject("nodes"); builder.startObject("nodes");
for (NodesShutdownResponse.NodeShutdownResponse nodeInfo : result) { for (DiscoveryNode node : response.nodes()) {
builder.startObject(nodeInfo.node().id()); builder.startObject(node.id());
builder.field("name", nodeInfo.node().name()); builder.field("name", node.name());
builder.endObject(); builder.endObject();
} }
builder.endObject(); builder.endObject();