Restart API: Allow to restart one or more nodes, closes #155.

This commit is contained in:
kimchy 2010-05-01 03:00:06 +03:00
parent 97958c3a66
commit ebded19dc1
16 changed files with 580 additions and 17 deletions

View File

@ -21,7 +21,8 @@ package org.elasticsearch.action;
import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo;
import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdown;
import org.elasticsearch.action.admin.cluster.node.restart.TransportNodesRestartAction;
import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdownAction;
import org.elasticsearch.action.admin.cluster.ping.broadcast.TransportBroadcastPingAction;
import org.elasticsearch.action.admin.cluster.ping.replication.TransportIndexReplicationPingAction;
import org.elasticsearch.action.admin.cluster.ping.replication.TransportReplicationPingAction;
@ -62,7 +63,8 @@ public class TransportActionModule extends AbstractModule {
@Override protected void configure() {
bind(TransportNodesInfo.class).asEagerSingleton();
bind(TransportNodesShutdown.class).asEagerSingleton();
bind(TransportNodesShutdownAction.class).asEagerSingleton();
bind(TransportNodesRestartAction.class).asEagerSingleton();
bind(TransportClusterStateAction.class).asEagerSingleton();
bind(TransportClusterHealthAction.class).asEagerSingleton();

View File

@ -74,6 +74,7 @@ public class TransportActions {
public static class Node {
public static final String INFO = "/cluster/nodes/info";
public static final String SHUTDOWN = "/cluster/nodes/shutdown";
public static final String RESTART = "/cluster/nodes/restart";
}
public static class Ping {

View File

@ -0,0 +1,72 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.restart;
import org.elasticsearch.action.support.nodes.NodesOperationRequest;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import java.io.IOException;
import static org.elasticsearch.util.TimeValue.*;
/**
* A request to restart one ore more nodes (or the whole cluster).
*
* @author kimchy (shay.banon)
*/
public class NodesRestartRequest extends NodesOperationRequest {
TimeValue delay = TimeValue.timeValueSeconds(1);
protected NodesRestartRequest() {
}
/**
* Restarts down nodes based on the nodes ids specified. If none are passed, <b>all</b>
* nodes will be shutdown.
*/
public NodesRestartRequest(String... nodesIds) {
super(nodesIds);
}
/**
* The delay for the restart to occur. Defaults to <tt>1s</tt>.
*/
public NodesRestartRequest delay(TimeValue delay) {
this.delay = delay;
return this;
}
public TimeValue delay() {
return this.delay;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
delay = readTimeValue(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
delay.writeTo(out);
}
}

View File

@ -0,0 +1,74 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.restart;
import org.elasticsearch.action.support.nodes.NodeOperationResponse;
import org.elasticsearch.action.support.nodes.NodesOperationResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import java.io.IOException;
/**
* @author kimchy (shay.banon)
*/
public class NodesRestartResponse extends NodesOperationResponse<NodesRestartResponse.NodeRestartResponse> {
NodesRestartResponse() {
}
public NodesRestartResponse(ClusterName clusterName, NodeRestartResponse[] nodes) {
super(clusterName, nodes);
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
nodes = new NodeRestartResponse[in.readVInt()];
for (int i = 0; i < nodes.length; i++) {
nodes[i] = NodeRestartResponse.readNodeRestartResponse(in);
}
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeVInt(nodes.length);
for (NodeRestartResponse node : nodes) {
node.writeTo(out);
}
}
public static class NodeRestartResponse extends NodeOperationResponse {
NodeRestartResponse() {
}
public NodeRestartResponse(DiscoveryNode node) {
super(node);
}
public static NodeRestartResponse readNodeRestartResponse(StreamInput in) throws IOException {
NodeRestartResponse res = new NodeRestartResponse();
res.readFrom(in);
return res;
}
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.action.admin.cluster.node.restart;
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ElasticSearchIllegalStateException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.support.nodes.NodeOperationRequest;
import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.node.Node;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.TimeValue;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.io.stream.StreamInput;
import org.elasticsearch.util.io.stream.StreamOutput;
import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.elasticsearch.util.TimeValue.*;
import static org.elasticsearch.util.gcommon.collect.Lists.*;
/**
* @author kimchy (shay.banon)
*/
public class TransportNodesRestartAction extends TransportNodesOperationAction<NodesRestartRequest, NodesRestartResponse, TransportNodesRestartAction.NodeRestartRequest, NodesRestartResponse.NodeRestartResponse> {
private final Node node;
private final boolean disabled;
@Inject public TransportNodesRestartAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
Node node) {
super(settings, clusterName, threadPool, clusterService, transportService);
this.node = node;
disabled = componentSettings.getAsBoolean("disabled", false);
}
@Override protected String transportAction() {
return TransportActions.Admin.Cluster.Node.RESTART;
}
@Override protected String transportNodeAction() {
return "/cluster/nodes/restart/node";
}
@Override protected NodesRestartResponse newResponse(NodesRestartRequest nodesShutdownRequest, AtomicReferenceArray responses) {
final List<NodesRestartResponse.NodeRestartResponse> nodeRestartResponses = newArrayList();
for (int i = 0; i < responses.length(); i++) {
Object resp = responses.get(i);
if (resp instanceof NodesRestartResponse.NodeRestartResponse) {
nodeRestartResponses.add((NodesRestartResponse.NodeRestartResponse) resp);
}
}
return new NodesRestartResponse(clusterName, nodeRestartResponses.toArray(new NodesRestartResponse.NodeRestartResponse[nodeRestartResponses.size()]));
}
@Override protected NodesRestartRequest newRequest() {
return new NodesRestartRequest();
}
@Override protected NodeRestartRequest newNodeRequest() {
return new NodeRestartRequest();
}
@Override protected NodeRestartRequest newNodeRequest(String nodeId, NodesRestartRequest request) {
return new NodeRestartRequest(nodeId, request.delay);
}
@Override protected NodesRestartResponse.NodeRestartResponse newNodeResponse() {
return new NodesRestartResponse.NodeRestartResponse();
}
@Override protected NodesRestartResponse.NodeRestartResponse nodeOperation(NodeRestartRequest request) throws ElasticSearchException {
if (disabled) {
throw new ElasticSearchIllegalStateException("Restart is disabled");
}
logger.info("Restarting in [{}]", request.delay);
threadPool.schedule(new Runnable() {
@Override public void run() {
boolean restartWithWrapper = false;
if (System.getProperty("elasticsearch-service") != null) {
try {
Class wrapperManager = settings.getClassLoader().loadClass("org.tanukisoftware.wrapper.WrapperManager");
logger.info("Initiating requested restart (using service)");
wrapperManager.getMethod("restartAndReturn").invoke(null);
restartWithWrapper = true;
} catch (Throwable e) {
e.printStackTrace();
}
}
if (!restartWithWrapper) {
logger.info("Initiating requested restart");
try {
node.stop();
node.start();
} catch (Exception e) {
logger.warn("Failed to restart", e);
}
}
}
}, request.delay.millis(), TimeUnit.MILLISECONDS);
return new NodesRestartResponse.NodeRestartResponse(clusterService.state().nodes().localNode());
}
@Override protected boolean accumulateExceptions() {
return false;
}
protected static class NodeRestartRequest extends NodeOperationRequest {
TimeValue delay;
private NodeRestartRequest() {
}
private NodeRestartRequest(String nodeId, TimeValue delay) {
super(nodeId);
this.delay = delay;
}
@Override public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
delay = readTimeValue(in);
}
@Override public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
delay.writeTo(out);
}
}
}

View File

@ -37,7 +37,6 @@ import org.elasticsearch.util.settings.Settings;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceArray;
import static org.elasticsearch.util.TimeValue.*;
@ -46,15 +45,15 @@ import static org.elasticsearch.util.gcommon.collect.Lists.*;
/**
* @author kimchy (shay.banon)
*/
public class TransportNodesShutdown extends TransportNodesOperationAction<NodesShutdownRequest, NodesShutdownResponse, TransportNodesShutdown.NodeShutdownRequest, NodesShutdownResponse.NodeShutdownResponse> {
public class TransportNodesShutdownAction extends TransportNodesOperationAction<NodesShutdownRequest, NodesShutdownResponse, TransportNodesShutdownAction.NodeShutdownRequest, NodesShutdownResponse.NodeShutdownResponse> {
private final Node node;
private final boolean disabled;
@Inject public TransportNodesShutdown(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
Node node) {
@Inject public TransportNodesShutdownAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
ClusterService clusterService, TransportService transportService,
Node node) {
super(settings, clusterName, threadPool, clusterService, transportService);
this.node = node;
disabled = componentSettings.getAsBoolean("disabled", false);
@ -95,13 +94,18 @@ public class TransportNodesShutdown extends TransportNodesOperationAction<NodesS
return new NodesShutdownResponse.NodeShutdownResponse();
}
@Override protected NodesShutdownResponse.NodeShutdownResponse nodeOperation(NodeShutdownRequest request) throws ElasticSearchException {
@Override protected NodesShutdownResponse.NodeShutdownResponse nodeOperation(final NodeShutdownRequest request) throws ElasticSearchException {
if (disabled) {
throw new ElasticSearchIllegalStateException("Shutdown is disabled");
}
logger.info("Shutting down in [{}]", request.delay);
threadPool.schedule(new Runnable() {
Thread t = new Thread(new Runnable() {
@Override public void run() {
try {
Thread.sleep(request.delay.millis());
} catch (InterruptedException e) {
// ignore
}
boolean shutdownWithWrapper = false;
if (System.getProperty("elasticsearch-service") != null) {
try {
@ -115,10 +119,18 @@ public class TransportNodesShutdown extends TransportNodesOperationAction<NodesS
}
if (!shutdownWithWrapper) {
logger.info("Initiating requested shutdown");
node.close();
try {
node.close();
} catch (Exception e) {
logger.warn("Failed to shutdown", e);
} finally {
// make sure we initiate the shutdown hooks, so the Bootstrap#main thread will exit
System.exit(0);
}
}
}
}, request.delay.millis(), TimeUnit.MILLISECONDS);
});
t.start();
return new NodesShutdownResponse.NodeShutdownResponse(clusterService.state().nodes().localNode());
}

View File

@ -38,6 +38,7 @@ import org.elasticsearch.util.settings.Settings;
import java.io.File;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import static jline.ANSIBuffer.ANSICodes.*;
import static org.elasticsearch.util.gcommon.collect.Sets.*;
@ -165,6 +166,23 @@ public class Bootstrap {
if (!foreground) {
System.err.close();
}
// keep this thread alive (non daemon thread) until we shutdown
final CountDownLatch latch = new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override public void run() {
latch.countDown();
}
});
while (true) {
try {
latch.await();
} catch (InterruptedException e) {
// bail out
}
break;
}
} catch (Throwable e) {
ESLogger logger = Loggers.getLogger(Bootstrap.class);
if (bootstrap.node != null) {

View File

@ -25,6 +25,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
@ -94,7 +96,7 @@ public interface ClusterAdminClient {
*
* @param request The nodes info request
* @param listener A listener to be notified with a result
* @see org.elasticsearch.client.Requests#nodesInfo(String...)
* @see org.elasticsearch.client.Requests#nodesShutdown(String...)
*/
void nodesInfo(NodesInfoRequest request, ActionListener<NodesInfoResponse> listener);
@ -116,6 +118,24 @@ public interface ClusterAdminClient {
*/
void nodesShutdown(NodesShutdownRequest request, ActionListener<NodesShutdownResponse> listener);
/**
* Restarts nodes in the cluster.
*
* @param request The nodes restart request
* @return The result future
* @see org.elasticsearch.client.Requests#nodesRestart(String...)
*/
ActionFuture<NodesRestartResponse> nodesRestart(NodesRestartRequest request);
/**
* Restarts nodes in the cluster.
*
* @param request The nodes restart request
* @param listener A listener to be notified with a result
* @see org.elasticsearch.client.Requests#nodesRestart(String...)
*/
void nodesRestart(NodesRestartRequest request, ActionListener<NodesRestartResponse> listener);
ActionFuture<SinglePingResponse> ping(SinglePingRequest request);
void ping(SinglePingRequest request, ActionListener<SinglePingResponse> listener);

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRequest;
@ -337,6 +338,24 @@ public class Requests {
return new NodesShutdownRequest(nodesIds);
}
/**
* Restarts all nodes in the cluster.
*/
public static NodesRestartRequest nodesRestart() {
return new NodesRestartRequest();
}
/**
* Restarts specific nodes in the cluster.
*
* @param nodesIds The nodes ids to restart
* @return The nodes info request
* @see org.elasticsearch.client.ClusterAdminClient#nodesRestart(org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest)
*/
public static NodesRestartRequest nodesRestart(String... nodesIds) {
return new NodesRestartRequest(nodesIds);
}
public static SinglePingRequest pingSingleRequest(String index) {
return new SinglePingRequest(index);
}

View File

@ -27,9 +27,12 @@ import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthActio
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse;
import org.elasticsearch.action.admin.cluster.node.restart.TransportNodesRestartAction;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse;
import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdown;
import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdownAction;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingResponse;
import org.elasticsearch.action.admin.cluster.ping.broadcast.TransportBroadcastPingAction;
@ -64,17 +67,20 @@ public class NodeClusterAdminClient extends AbstractComponent implements Cluster
private final TransportNodesInfo nodesInfo;
private final TransportNodesShutdown nodesShutdown;
private final TransportNodesShutdownAction nodesShutdown;
private final TransportNodesRestartAction nodesRestart;
@Inject public NodeClusterAdminClient(Settings settings,
TransportClusterHealthAction clusterHealthAction, TransportClusterStateAction clusterStateAction,
TransportSinglePingAction singlePingAction, TransportBroadcastPingAction broadcastPingAction, TransportReplicationPingAction replicationPingAction,
TransportNodesInfo nodesInfo, TransportNodesShutdown nodesShutdown) {
TransportNodesInfo nodesInfo, TransportNodesShutdownAction nodesShutdown, TransportNodesRestartAction nodesRestart) {
super(settings);
this.clusterHealthAction = clusterHealthAction;
this.clusterStateAction = clusterStateAction;
this.nodesInfo = nodesInfo;
this.nodesShutdown = nodesShutdown;
this.nodesRestart = nodesRestart;
this.singlePingAction = singlePingAction;
this.broadcastPingAction = broadcastPingAction;
this.replicationPingAction = replicationPingAction;
@ -135,4 +141,12 @@ public class NodeClusterAdminClient extends AbstractComponent implements Cluster
@Override public void nodesShutdown(NodesShutdownRequest request, ActionListener<NodesShutdownResponse> listener) {
nodesShutdown.execute(request, listener);
}
@Override public ActionFuture<NodesRestartResponse> nodesRestart(NodesRestartRequest request) {
return nodesRestart.execute(request);
}
@Override public void nodesRestart(NodesRestartRequest request, ActionListener<NodesRestartResponse> listener) {
nodesRestart.execute(request, listener);
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.client.transport.action;
import org.elasticsearch.client.transport.action.admin.cluster.health.ClientTransportClusterHealthAction;
import org.elasticsearch.client.transport.action.admin.cluster.node.info.ClientTransportNodesInfoAction;
import org.elasticsearch.client.transport.action.admin.cluster.node.restart.ClientTransportNodesRestartAction;
import org.elasticsearch.client.transport.action.admin.cluster.node.shutdown.ClientTransportNodesShutdownAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction;
@ -74,6 +75,7 @@ public class ClientTransportActionModule extends AbstractModule {
bind(ClientTransportNodesInfoAction.class).asEagerSingleton();
bind(ClientTransportNodesShutdownAction.class).asEagerSingleton();
bind(ClientTransportNodesRestartAction.class).asEagerSingleton();
bind(ClientTransportSinglePingAction.class).asEagerSingleton();
bind(ClientTransportReplicationPingAction.class).asEagerSingleton();
bind(ClientTransportBroadcastPingAction.class).asEagerSingleton();

View File

@ -0,0 +1,42 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.transport.action.admin.cluster.node.restart;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse;
import org.elasticsearch.client.transport.action.support.BaseClientTransportAction;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings;
/**
* @author kimchy (shay.banon)
*/
public class ClientTransportNodesRestartAction extends BaseClientTransportAction<NodesRestartRequest, NodesRestartResponse> {
@Inject public ClientTransportNodesRestartAction(Settings settings, TransportService transportService) {
super(settings, transportService, NodesRestartResponse.class);
}
@Override protected String action() {
return TransportActions.Admin.Cluster.Node.RESTART;
}
}

View File

@ -26,6 +26,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest;
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse;
import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
@ -40,6 +42,7 @@ import org.elasticsearch.client.ClusterAdminClient;
import org.elasticsearch.client.transport.TransportClientNodesService;
import org.elasticsearch.client.transport.action.admin.cluster.health.ClientTransportClusterHealthAction;
import org.elasticsearch.client.transport.action.admin.cluster.node.info.ClientTransportNodesInfoAction;
import org.elasticsearch.client.transport.action.admin.cluster.node.restart.ClientTransportNodesRestartAction;
import org.elasticsearch.client.transport.action.admin.cluster.node.shutdown.ClientTransportNodesShutdownAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction;
import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction;
@ -71,16 +74,19 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple
private final ClientTransportNodesShutdownAction nodesShutdownAction;
private final ClientTransportNodesRestartAction nodesRestartAction;
@Inject public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService,
ClientTransportClusterHealthAction clusterHealthAction, ClientTransportClusterStateAction clusterStateAction,
ClientTransportSinglePingAction singlePingAction, ClientTransportReplicationPingAction replicationPingAction, ClientTransportBroadcastPingAction broadcastPingAction,
ClientTransportNodesInfoAction nodesInfoAction, ClientTransportNodesShutdownAction nodesShutdownAction) {
ClientTransportNodesInfoAction nodesInfoAction, ClientTransportNodesShutdownAction nodesShutdownAction, ClientTransportNodesRestartAction nodesRestartAction) {
super(settings);
this.nodesService = nodesService;
this.clusterHealthAction = clusterHealthAction;
this.clusterStateAction = clusterStateAction;
this.nodesInfoAction = nodesInfoAction;
this.nodesShutdownAction = nodesShutdownAction;
this.nodesRestartAction = nodesRestartAction;
this.singlePingAction = singlePingAction;
this.replicationPingAction = replicationPingAction;
this.broadcastPingAction = broadcastPingAction;
@ -204,4 +210,21 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple
}
});
}
@Override public ActionFuture<NodesRestartResponse> nodesRestart(final NodesRestartRequest request) {
return nodesService.execute(new TransportClientNodesService.NodeCallback<org.elasticsearch.action.ActionFuture<org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse>>() {
@Override public ActionFuture<NodesRestartResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {
return nodesRestartAction.execute(node, request);
}
});
}
@Override public void nodesRestart(final NodesRestartRequest request, final ActionListener<NodesRestartResponse> listener) {
nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Void>>() {
@Override public ActionFuture<Void> doWithNode(DiscoveryNode node) throws ElasticSearchException {
nodesRestartAction.execute(node, request, listener);
return null;
}
});
}
}

View File

@ -21,6 +21,7 @@ package org.elasticsearch.rest.action;
import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction;
import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
import org.elasticsearch.rest.action.admin.cluster.node.restart.RestNodesRestartAction;
import org.elasticsearch.rest.action.admin.cluster.node.shutdown.RestNodesShutdownAction;
import org.elasticsearch.rest.action.admin.cluster.ping.broadcast.RestBroadcastPingAction;
import org.elasticsearch.rest.action.admin.cluster.ping.replication.RestReplicationPingAction;
@ -58,6 +59,7 @@ public class RestActionModule extends AbstractModule {
bind(RestNodesInfoAction.class).asEagerSingleton();
bind(RestNodesShutdownAction.class).asEagerSingleton();
bind(RestNodesRestartAction.class).asEagerSingleton();
bind(RestClusterStateAction.class).asEagerSingleton();
bind(RestClusterHealthAction.class).asEagerSingleton();

View File

@ -0,0 +1,84 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.rest.action.admin.cluster.node.restart;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.rest.*;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.util.guice.inject.Inject;
import org.elasticsearch.util.settings.Settings;
import org.elasticsearch.util.xcontent.builder.XContentBuilder;
import java.io.IOException;
import static org.elasticsearch.rest.action.support.RestXContentBuilder.*;
/**
* @author kimchy (shay.banon)
*/
public class RestNodesRestartAction extends BaseRestHandler {
@Inject public RestNodesRestartAction(Settings settings, Client client, RestController controller) {
super(settings, client);
controller.registerHandler(RestRequest.Method.POST, "/_cluster/nodes/_restart", this);
controller.registerHandler(RestRequest.Method.POST, "/_cluster/nodes/{nodeId}/_restart", this);
}
@Override public void handleRequest(final RestRequest request, final RestChannel channel) {
String[] nodesIds = RestActions.splitNodes(request.param("nodeId"));
NodesRestartRequest nodesRestartRequest = new NodesRestartRequest(nodesIds);
nodesRestartRequest.listenerThreaded(false);
nodesRestartRequest.delay(request.paramAsTime("delay", nodesRestartRequest.delay()));
client.admin().cluster().nodesRestart(nodesRestartRequest, new ActionListener<NodesRestartResponse>() {
@Override public void onResponse(NodesRestartResponse result) {
try {
XContentBuilder builder = restContentBuilder(request);
builder.startObject();
builder.field("cluster_name", result.clusterName().value());
builder.startObject("nodes");
for (NodesRestartResponse.NodeRestartResponse nodeInfo : result) {
builder.startObject(nodeInfo.node().id());
builder.field("name", nodeInfo.node().name());
builder.endObject();
}
builder.endObject();
builder.endObject();
channel.sendResponse(new JsonRestResponse(request, RestResponse.Status.OK, builder));
} catch (Exception e) {
onFailure(e);
}
}
@Override public void onFailure(Throwable e) {
try {
channel.sendResponse(new JsonThrowableRestResponse(request, e));
} catch (IOException e1) {
logger.error("Failed to send failure response", e1);
}
}
});
}
}

View File

@ -24,6 +24,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest
import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest
import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest
@ -109,7 +111,7 @@ class GClusterAdminClient {
clusterAdminClient.nodesInfo(request, listener)
}
// NODES INFO
// NODES SHUTDOWN
GActionFuture<NodesShutdownResponse> nodesShutdown(Closure c) {
NodesShutdownRequest request = new NodesShutdownRequest()
@ -128,4 +130,24 @@ class GClusterAdminClient {
void nodesShutdown(NodesShutdownRequest request, ActionListener<NodesShutdownResponse> listener) {
clusterAdminClient.nodesShutdown(request, listener)
}
// NODES RESTART
GActionFuture<NodesRestartResponse> nodesRestart(Closure c) {
NodesRestartRequest request = new NodesRestartRequest()
c.setDelegate request
c.resolveStrategy = gClient.resolveStrategy
c.call()
nodesRestart(request)
}
GActionFuture<NodesRestartResponse> nodesRestart(NodesRestartRequest request) {
GActionFuture<NodesRestartResponse> future = new GActionFuture<NodesRestartResponse>(internalClient.threadPool(), request);
clusterAdminClient.nodesRestart(request, future)
return future
}
void nodesRestart(NodesRestartRequest request, ActionListener<NodesRestartResponse> listener) {
clusterAdminClient.nodesRestart(request, listener)
}
}