add an (internal) flag if to control if the shutdown request should cause the jvm to exit or not

This commit is contained in:
kimchy 2011-07-14 22:39:44 +03:00
parent 848638d53c
commit 8532f433ce
6 changed files with 89 additions and 9 deletions

View File

@ -39,6 +39,8 @@ public class NodesShutdownRequest extends MasterNodeOperationRequest {
TimeValue delay = TimeValue.timeValueSeconds(1);
boolean exit = true;
NodesShutdownRequest() {
}
@ -70,6 +72,21 @@ public class NodesShutdownRequest extends MasterNodeOperationRequest {
return delay(TimeValue.parseTimeValue(delay, null));
}
/**
* Should the JVM be exited as well or not. Defaults to <tt>true</tt>.
*/
public NodesShutdownRequest exit(boolean exit) {
this.exit = exit;
return this;
}
/**
* Should the JVM be exited as well or not. Defaults to <tt>true</tt>.
*/
public boolean exit() {
return exit;
}
@Override public ActionRequestValidationException validate() {
return null;
}
@ -84,6 +101,7 @@ public class NodesShutdownRequest extends MasterNodeOperationRequest {
nodesIds[i] = in.readUTF();
}
}
exit = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
@ -97,5 +115,6 @@ public class NodesShutdownRequest extends MasterNodeOperationRequest {
out.writeUTF(nodeId);
}
}
out.writeBoolean(exit);
}
}

View File

@ -30,13 +30,21 @@ import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.io.stream.VoidStreamable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.*;
import org.elasticsearch.transport.BaseTransportRequestHandler;
import org.elasticsearch.transport.TransportChannel;
import org.elasticsearch.transport.TransportException;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.transport.VoidTransportResponseHandler;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -119,7 +127,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
latch.countDown();
} else {
logger.trace("[cluster_shutdown]: sending shutdown request to [{}]", node);
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request.exit), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleResponse(VoidStreamable response) {
logger.trace("[cluster_shutdown]: received shutdown response from [{}]", node);
latch.countDown();
@ -141,7 +149,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
// now, kill the master
logger.trace("[cluster_shutdown]: shutting down the master [{}]", state.nodes().masterNode());
transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(state.nodes().masterNode(), NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request.exit), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleResponse(VoidStreamable response) {
logger.trace("[cluster_shutdown]: received shutdown response from master");
}
@ -182,7 +190,7 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
}
logger.trace("[partial_cluster_shutdown]: sending shutdown request to [{}]", node);
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, VoidStreamable.INSTANCE, new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
transportService.sendRequest(node, NodeShutdownRequestHandler.ACTION, new NodeShutdownRequest(request.exit), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@Override public void handleResponse(VoidStreamable response) {
logger.trace("[partial_cluster_shutdown]: received shutdown response from [{}]", node);
latch.countDown();
@ -209,19 +217,19 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
return new NodesShutdownResponse(clusterName, nodes.toArray(new DiscoveryNode[nodes.size()]));
}
private class NodeShutdownRequestHandler extends BaseTransportRequestHandler<VoidStreamable> {
private class NodeShutdownRequestHandler extends BaseTransportRequestHandler<NodeShutdownRequest> {
static final String ACTION = "/cluster/nodes/shutdown/node";
@Override public VoidStreamable newInstance() {
return VoidStreamable.INSTANCE;
@Override public NodeShutdownRequest newInstance() {
return new NodeShutdownRequest();
}
@Override public String executor() {
return ThreadPool.Names.SAME;
}
@Override public void messageReceived(VoidStreamable request, TransportChannel channel) throws Exception {
@Override public void messageReceived(final NodeShutdownRequest request, TransportChannel channel) throws Exception {
if (disabled) {
throw new ElasticSearchIllegalStateException("Shutdown is disabled");
}
@ -233,6 +241,15 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
} catch (InterruptedException e) {
// ignore
}
if (!request.exit) {
logger.info("initiating requested shutdown (no exit)...");
try {
node.close();
} catch (Exception e) {
logger.warn("Failed to shutdown", e);
}
return;
}
boolean shutdownWithWrapper = false;
if (System.getProperty("elasticsearch-service") != null) {
try {
@ -262,4 +279,24 @@ public class TransportNodesShutdownAction extends TransportMasterNodeOperationAc
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
static class NodeShutdownRequest implements Streamable {
boolean exit;
NodeShutdownRequest() {
}
NodeShutdownRequest(boolean exit) {
this.exit = exit;
}
@Override public void readFrom(StreamInput in) throws IOException {
exit = in.readBoolean();
}
@Override public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(exit);
}
}
}

View File

@ -59,6 +59,14 @@ public class NodesShutdownRequestBuilder extends BaseClusterRequestBuilder<Nodes
return this;
}
/**
* Should the JVM be exited as well or not. Defaults to <tt>true</tt>.
*/
public NodesShutdownRequestBuilder setExit(boolean exit) {
request.exit(exit);
return this;
}
/**
* Sets the master node timeout in case the master has not yet been discovered.
*/

View File

@ -57,4 +57,9 @@ public interface Node {
* Closes the node (and {@link #stop}s if its running).
*/
void close();
/**
* Returns <tt>true</tt> if the node is closed.
*/
boolean isClosed();
}

View File

@ -320,6 +320,10 @@ public final class InternalNode implements Node {
logger.info("{{}}[{}]: closed", Version.full(), JvmInfo.jvmInfo().pid());
}
@Override public boolean isClosed() {
return lifecycle.closed();
}
public Injector injector() {
return this.injector;
}

View File

@ -27,7 +27,13 @@ import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.XContentRestResponse;
import org.elasticsearch.rest.XContentThrowableRestResponse;
import org.elasticsearch.rest.action.support.RestActions;
import java.io.IOException;
@ -52,6 +58,7 @@ public class RestNodesShutdownAction extends BaseRestHandler {
NodesShutdownRequest nodesShutdownRequest = new NodesShutdownRequest(nodesIds);
nodesShutdownRequest.listenerThreaded(false);
nodesShutdownRequest.delay(request.paramAsTime("delay", nodesShutdownRequest.delay()));
nodesShutdownRequest.exit(request.paramAsBoolean("exit", nodesShutdownRequest.exit()));
client.admin().cluster().nodesShutdown(nodesShutdownRequest, new ActionListener<NodesShutdownResponse>() {
@Override public void onResponse(NodesShutdownResponse response) {
try {