From ebded19dc1b41ac3841d44935cd7e8def2ae47de Mon Sep 17 00:00:00 2001
From: kimchy <kimchy@gmail.com>
Date: Sat, 1 May 2010 03:00:06 +0300
Subject: [PATCH] Restart API: Allow to restart one or more nodes, closes #155.

---
 .../action/TransportActionModule.java         |   6 +-
 .../action/TransportActions.java              |   1 +
 .../node/restart/NodesRestartRequest.java     |  72 ++++++++
 .../node/restart/NodesRestartResponse.java    |  74 +++++++++
 .../restart/TransportNodesRestartAction.java  | 156 ++++++++++++++++++
 ...java => TransportNodesShutdownAction.java} |  30 +++-
 .../elasticsearch/bootstrap/Bootstrap.java    |  18 ++
 .../client/ClusterAdminClient.java            |  22 ++-
 .../org/elasticsearch/client/Requests.java    |  19 +++
 .../client/node/NodeClusterAdminClient.java   |  20 ++-
 .../action/ClientTransportActionModule.java   |   2 +
 .../ClientTransportNodesRestartAction.java    |  42 +++++
 .../InternalTransportClusterAdminClient.java  |  25 ++-
 .../rest/action/RestActionModule.java         |   2 +
 .../node/restart/RestNodesRestartAction.java  |  84 ++++++++++
 .../groovy/client/GClusterAdminClient.groovy  |  24 ++-
 16 files changed, 580 insertions(+), 17 deletions(-)
 create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/NodesRestartRequest.java
 create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/NodesRestartResponse.java
 create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/TransportNodesRestartAction.java
 rename modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/{TransportNodesShutdown.java => TransportNodesShutdownAction.java} (81%)
 create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/node/restart/ClientTransportNodesRestartAction.java
 create mode 100644 modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/restart/RestNodesRestartAction.java

diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java
index d00e4c37f06..9b0410ca693 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActionModule.java
@@ -21,7 +21,8 @@ package org.elasticsearch.action;
 
 import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthAction;
 import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo;
-import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdown;
+import org.elasticsearch.action.admin.cluster.node.restart.TransportNodesRestartAction;
+import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdownAction;
 import org.elasticsearch.action.admin.cluster.ping.broadcast.TransportBroadcastPingAction;
 import org.elasticsearch.action.admin.cluster.ping.replication.TransportIndexReplicationPingAction;
 import org.elasticsearch.action.admin.cluster.ping.replication.TransportReplicationPingAction;
@@ -62,7 +63,8 @@ public class TransportActionModule extends AbstractModule {
     @Override protected void configure() {
 
         bind(TransportNodesInfo.class).asEagerSingleton();
-        bind(TransportNodesShutdown.class).asEagerSingleton();
+        bind(TransportNodesShutdownAction.class).asEagerSingleton();
+        bind(TransportNodesRestartAction.class).asEagerSingleton();
         bind(TransportClusterStateAction.class).asEagerSingleton();
         bind(TransportClusterHealthAction.class).asEagerSingleton();
 
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java
index dd1f71c3787..fedc48621b2 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/TransportActions.java
@@ -74,6 +74,7 @@ public class TransportActions {
             public static class Node {
                 public static final String INFO = "/cluster/nodes/info";
                 public static final String SHUTDOWN = "/cluster/nodes/shutdown";
+                public static final String RESTART = "/cluster/nodes/restart";
             }
 
             public static class Ping {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/NodesRestartRequest.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/NodesRestartRequest.java
new file mode 100644
index 00000000000..c636d9a1cd7
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/NodesRestartRequest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.restart;
+
+import org.elasticsearch.action.support.nodes.NodesOperationRequest;
+import org.elasticsearch.util.TimeValue;
+import org.elasticsearch.util.io.stream.StreamInput;
+import org.elasticsearch.util.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+import static org.elasticsearch.util.TimeValue.*;
+
+/**
+ * A request to restart one ore more nodes (or the whole cluster).
+ *
+ * @author kimchy (shay.banon)
+ */
+public class NodesRestartRequest extends NodesOperationRequest {
+
+    TimeValue delay = TimeValue.timeValueSeconds(1);
+
+    protected NodesRestartRequest() {
+    }
+
+    /**
+     * Restarts down nodes based on the nodes ids specified. If none are passed, <b>all</b>
+     * nodes will be shutdown.
+     */
+    public NodesRestartRequest(String... nodesIds) {
+        super(nodesIds);
+    }
+
+    /**
+     * The delay for the restart to occur. Defaults to <tt>1s</tt>.
+     */
+    public NodesRestartRequest delay(TimeValue delay) {
+        this.delay = delay;
+        return this;
+    }
+
+    public TimeValue delay() {
+        return this.delay;
+    }
+
+    @Override public void readFrom(StreamInput in) throws IOException {
+        super.readFrom(in);
+        delay = readTimeValue(in);
+    }
+
+    @Override public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        delay.writeTo(out);
+    }
+}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/NodesRestartResponse.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/NodesRestartResponse.java
new file mode 100644
index 00000000000..23e901c26e3
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/NodesRestartResponse.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.restart;
+
+import org.elasticsearch.action.support.nodes.NodeOperationResponse;
+import org.elasticsearch.action.support.nodes.NodesOperationResponse;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.node.DiscoveryNode;
+import org.elasticsearch.util.io.stream.StreamInput;
+import org.elasticsearch.util.io.stream.StreamOutput;
+
+import java.io.IOException;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class NodesRestartResponse extends NodesOperationResponse<NodesRestartResponse.NodeRestartResponse> {
+
+    NodesRestartResponse() {
+    }
+
+    public NodesRestartResponse(ClusterName clusterName, NodeRestartResponse[] nodes) {
+        super(clusterName, nodes);
+    }
+
+    @Override public void readFrom(StreamInput in) throws IOException {
+        super.readFrom(in);
+        nodes = new NodeRestartResponse[in.readVInt()];
+        for (int i = 0; i < nodes.length; i++) {
+            nodes[i] = NodeRestartResponse.readNodeRestartResponse(in);
+        }
+    }
+
+    @Override public void writeTo(StreamOutput out) throws IOException {
+        super.writeTo(out);
+        out.writeVInt(nodes.length);
+        for (NodeRestartResponse node : nodes) {
+            node.writeTo(out);
+        }
+    }
+
+    public static class NodeRestartResponse extends NodeOperationResponse {
+
+        NodeRestartResponse() {
+        }
+
+        public NodeRestartResponse(DiscoveryNode node) {
+            super(node);
+        }
+
+        public static NodeRestartResponse readNodeRestartResponse(StreamInput in) throws IOException {
+            NodeRestartResponse res = new NodeRestartResponse();
+            res.readFrom(in);
+            return res;
+        }
+    }
+}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/TransportNodesRestartAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/TransportNodesRestartAction.java
new file mode 100644
index 00000000000..d5de001b9f2
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/restart/TransportNodesRestartAction.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.action.admin.cluster.node.restart;
+
+import org.elasticsearch.ElasticSearchException;
+import org.elasticsearch.ElasticSearchIllegalStateException;
+import org.elasticsearch.action.TransportActions;
+import org.elasticsearch.action.support.nodes.NodeOperationRequest;
+import org.elasticsearch.action.support.nodes.TransportNodesOperationAction;
+import org.elasticsearch.cluster.ClusterName;
+import org.elasticsearch.cluster.ClusterService;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.threadpool.ThreadPool;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.util.TimeValue;
+import org.elasticsearch.util.guice.inject.Inject;
+import org.elasticsearch.util.io.stream.StreamInput;
+import org.elasticsearch.util.io.stream.StreamOutput;
+import org.elasticsearch.util.settings.Settings;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+import static org.elasticsearch.util.TimeValue.*;
+import static org.elasticsearch.util.gcommon.collect.Lists.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class TransportNodesRestartAction extends TransportNodesOperationAction<NodesRestartRequest, NodesRestartResponse, TransportNodesRestartAction.NodeRestartRequest, NodesRestartResponse.NodeRestartResponse> {
+
+    private final Node node;
+
+    private final boolean disabled;
+
+    @Inject public TransportNodesRestartAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
+                                               ClusterService clusterService, TransportService transportService,
+                                               Node node) {
+        super(settings, clusterName, threadPool, clusterService, transportService);
+        this.node = node;
+        disabled = componentSettings.getAsBoolean("disabled", false);
+    }
+
+    @Override protected String transportAction() {
+        return TransportActions.Admin.Cluster.Node.RESTART;
+    }
+
+    @Override protected String transportNodeAction() {
+        return "/cluster/nodes/restart/node";
+    }
+
+    @Override protected NodesRestartResponse newResponse(NodesRestartRequest nodesShutdownRequest, AtomicReferenceArray responses) {
+        final List<NodesRestartResponse.NodeRestartResponse> nodeRestartResponses = newArrayList();
+        for (int i = 0; i < responses.length(); i++) {
+            Object resp = responses.get(i);
+            if (resp instanceof NodesRestartResponse.NodeRestartResponse) {
+                nodeRestartResponses.add((NodesRestartResponse.NodeRestartResponse) resp);
+            }
+        }
+        return new NodesRestartResponse(clusterName, nodeRestartResponses.toArray(new NodesRestartResponse.NodeRestartResponse[nodeRestartResponses.size()]));
+    }
+
+    @Override protected NodesRestartRequest newRequest() {
+        return new NodesRestartRequest();
+    }
+
+    @Override protected NodeRestartRequest newNodeRequest() {
+        return new NodeRestartRequest();
+    }
+
+    @Override protected NodeRestartRequest newNodeRequest(String nodeId, NodesRestartRequest request) {
+        return new NodeRestartRequest(nodeId, request.delay);
+    }
+
+    @Override protected NodesRestartResponse.NodeRestartResponse newNodeResponse() {
+        return new NodesRestartResponse.NodeRestartResponse();
+    }
+
+    @Override protected NodesRestartResponse.NodeRestartResponse nodeOperation(NodeRestartRequest request) throws ElasticSearchException {
+        if (disabled) {
+            throw new ElasticSearchIllegalStateException("Restart is disabled");
+        }
+        logger.info("Restarting in [{}]", request.delay);
+        threadPool.schedule(new Runnable() {
+            @Override public void run() {
+                boolean restartWithWrapper = false;
+                if (System.getProperty("elasticsearch-service") != null) {
+                    try {
+                        Class wrapperManager = settings.getClassLoader().loadClass("org.tanukisoftware.wrapper.WrapperManager");
+                        logger.info("Initiating requested restart (using service)");
+                        wrapperManager.getMethod("restartAndReturn").invoke(null);
+                        restartWithWrapper = true;
+                    } catch (Throwable e) {
+                        e.printStackTrace();
+                    }
+                }
+                if (!restartWithWrapper) {
+                    logger.info("Initiating requested restart");
+                    try {
+                        node.stop();
+                        node.start();
+                    } catch (Exception e) {
+                        logger.warn("Failed to restart", e);
+                    }
+                }
+            }
+        }, request.delay.millis(), TimeUnit.MILLISECONDS);
+        return new NodesRestartResponse.NodeRestartResponse(clusterService.state().nodes().localNode());
+    }
+
+    @Override protected boolean accumulateExceptions() {
+        return false;
+    }
+
+    protected static class NodeRestartRequest extends NodeOperationRequest {
+
+        TimeValue delay;
+
+        private NodeRestartRequest() {
+        }
+
+        private NodeRestartRequest(String nodeId, TimeValue delay) {
+            super(nodeId);
+            this.delay = delay;
+        }
+
+        @Override public void readFrom(StreamInput in) throws IOException {
+            super.readFrom(in);
+            delay = readTimeValue(in);
+        }
+
+        @Override public void writeTo(StreamOutput out) throws IOException {
+            super.writeTo(out);
+            delay.writeTo(out);
+        }
+    }
+}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdown.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java
similarity index 81%
rename from modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdown.java
rename to modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java
index a11c1f3d88d..276b2b6ae86 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdown.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/node/shutdown/TransportNodesShutdownAction.java
@@ -37,7 +37,6 @@ import org.elasticsearch.util.settings.Settings;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceArray;
 
 import static org.elasticsearch.util.TimeValue.*;
@@ -46,15 +45,15 @@ import static org.elasticsearch.util.gcommon.collect.Lists.*;
 /**
  * @author kimchy (shay.banon)
  */
-public class TransportNodesShutdown extends TransportNodesOperationAction<NodesShutdownRequest, NodesShutdownResponse, TransportNodesShutdown.NodeShutdownRequest, NodesShutdownResponse.NodeShutdownResponse> {
+public class TransportNodesShutdownAction extends TransportNodesOperationAction<NodesShutdownRequest, NodesShutdownResponse, TransportNodesShutdownAction.NodeShutdownRequest, NodesShutdownResponse.NodeShutdownResponse> {
 
     private final Node node;
 
     private final boolean disabled;
 
-    @Inject public TransportNodesShutdown(Settings settings, ClusterName clusterName, ThreadPool threadPool,
-                                          ClusterService clusterService, TransportService transportService,
-                                          Node node) {
+    @Inject public TransportNodesShutdownAction(Settings settings, ClusterName clusterName, ThreadPool threadPool,
+                                                ClusterService clusterService, TransportService transportService,
+                                                Node node) {
         super(settings, clusterName, threadPool, clusterService, transportService);
         this.node = node;
         disabled = componentSettings.getAsBoolean("disabled", false);
@@ -95,13 +94,18 @@ public class TransportNodesShutdown extends TransportNodesOperationAction<NodesS
         return new NodesShutdownResponse.NodeShutdownResponse();
     }
 
-    @Override protected NodesShutdownResponse.NodeShutdownResponse nodeOperation(NodeShutdownRequest request) throws ElasticSearchException {
+    @Override protected NodesShutdownResponse.NodeShutdownResponse nodeOperation(final NodeShutdownRequest request) throws ElasticSearchException {
         if (disabled) {
             throw new ElasticSearchIllegalStateException("Shutdown is disabled");
         }
         logger.info("Shutting down in [{}]", request.delay);
-        threadPool.schedule(new Runnable() {
+        Thread t = new Thread(new Runnable() {
             @Override public void run() {
+                try {
+                    Thread.sleep(request.delay.millis());
+                } catch (InterruptedException e) {
+                    // ignore
+                }
                 boolean shutdownWithWrapper = false;
                 if (System.getProperty("elasticsearch-service") != null) {
                     try {
@@ -115,10 +119,18 @@ public class TransportNodesShutdown extends TransportNodesOperationAction<NodesS
                 }
                 if (!shutdownWithWrapper) {
                     logger.info("Initiating requested shutdown");
-                    node.close();
+                    try {
+                        node.close();
+                    } catch (Exception e) {
+                        logger.warn("Failed to shutdown", e);
+                    } finally {
+                        // make sure we initiate the shutdown hooks, so the Bootstrap#main thread will exit
+                        System.exit(0);
+                    }
                 }
             }
-        }, request.delay.millis(), TimeUnit.MILLISECONDS);
+        });
+        t.start();
         return new NodesShutdownResponse.NodeShutdownResponse(clusterService.state().nodes().localNode());
     }
 
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java b/modules/elasticsearch/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java
index 18eb8d19948..72a05050285 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/bootstrap/Bootstrap.java
@@ -38,6 +38,7 @@ import org.elasticsearch.util.settings.Settings;
 
 import java.io.File;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 
 import static jline.ANSIBuffer.ANSICodes.*;
 import static org.elasticsearch.util.gcommon.collect.Sets.*;
@@ -165,6 +166,23 @@ public class Bootstrap {
             if (!foreground) {
                 System.err.close();
             }
+
+            // keep this thread alive (non daemon thread) until we shutdown
+            final CountDownLatch latch = new CountDownLatch(1);
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                @Override public void run() {
+                    latch.countDown();
+                }
+            });
+
+            while (true) {
+                try {
+                    latch.await();
+                } catch (InterruptedException e) {
+                    // bail out
+                }
+                break;
+            }
         } catch (Throwable e) {
             ESLogger logger = Loggers.getLogger(Bootstrap.class);
             if (bootstrap.node != null) {
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java
index c7b77f37cd4..01451611bc6 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/ClusterAdminClient.java
@@ -25,6 +25,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse;
 import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest;
 import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse;
 import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
@@ -94,7 +96,7 @@ public interface ClusterAdminClient {
      *
      * @param request  The nodes info request
      * @param listener A listener to be notified with a result
-     * @see org.elasticsearch.client.Requests#nodesInfo(String...)
+     * @see org.elasticsearch.client.Requests#nodesShutdown(String...)
      */
     void nodesInfo(NodesInfoRequest request, ActionListener<NodesInfoResponse> listener);
 
@@ -116,6 +118,24 @@ public interface ClusterAdminClient {
      */
     void nodesShutdown(NodesShutdownRequest request, ActionListener<NodesShutdownResponse> listener);
 
+    /**
+     * Restarts nodes in the cluster.
+     *
+     * @param request The nodes restart request
+     * @return The result future
+     * @see org.elasticsearch.client.Requests#nodesRestart(String...)
+     */
+    ActionFuture<NodesRestartResponse> nodesRestart(NodesRestartRequest request);
+
+    /**
+     * Restarts nodes in the cluster.
+     *
+     * @param request  The nodes restart request
+     * @param listener A listener to be notified with a result
+     * @see org.elasticsearch.client.Requests#nodesRestart(String...)
+     */
+    void nodesRestart(NodesRestartRequest request, ActionListener<NodesRestartResponse> listener);
+
     ActionFuture<SinglePingResponse> ping(SinglePingRequest request);
 
     void ping(SinglePingRequest request, ActionListener<SinglePingResponse> listener);
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java
index c427b6ac00c..42f18284aa8 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/Requests.java
@@ -21,6 +21,7 @@ package org.elasticsearch.client;
 
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
 import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest;
 import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
 import org.elasticsearch.action.admin.cluster.ping.replication.ReplicationPingRequest;
@@ -337,6 +338,24 @@ public class Requests {
         return new NodesShutdownRequest(nodesIds);
     }
 
+    /**
+     * Restarts all nodes in the cluster.
+     */
+    public static NodesRestartRequest nodesRestart() {
+        return new NodesRestartRequest();
+    }
+
+    /**
+     * Restarts specific nodes in the cluster.
+     *
+     * @param nodesIds The nodes ids to restart
+     * @return The nodes info request
+     * @see org.elasticsearch.client.ClusterAdminClient#nodesRestart(org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest)
+     */
+    public static NodesRestartRequest nodesRestart(String... nodesIds) {
+        return new NodesRestartRequest(nodesIds);
+    }
+
     public static SinglePingRequest pingSingleRequest(String index) {
         return new SinglePingRequest(index);
     }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeClusterAdminClient.java
index 38abdb34080..d2e71214257 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeClusterAdminClient.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/node/NodeClusterAdminClient.java
@@ -27,9 +27,12 @@ import org.elasticsearch.action.admin.cluster.health.TransportClusterHealthActio
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
 import org.elasticsearch.action.admin.cluster.node.info.TransportNodesInfo;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse;
+import org.elasticsearch.action.admin.cluster.node.restart.TransportNodesRestartAction;
 import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest;
 import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse;
-import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdown;
+import org.elasticsearch.action.admin.cluster.node.shutdown.TransportNodesShutdownAction;
 import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
 import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingResponse;
 import org.elasticsearch.action.admin.cluster.ping.broadcast.TransportBroadcastPingAction;
@@ -64,17 +67,20 @@ public class NodeClusterAdminClient extends AbstractComponent implements Cluster
 
     private final TransportNodesInfo nodesInfo;
 
-    private final TransportNodesShutdown nodesShutdown;
+    private final TransportNodesShutdownAction nodesShutdown;
+
+    private final TransportNodesRestartAction nodesRestart;
 
     @Inject public NodeClusterAdminClient(Settings settings,
                                           TransportClusterHealthAction clusterHealthAction, TransportClusterStateAction clusterStateAction,
                                           TransportSinglePingAction singlePingAction, TransportBroadcastPingAction broadcastPingAction, TransportReplicationPingAction replicationPingAction,
-                                          TransportNodesInfo nodesInfo, TransportNodesShutdown nodesShutdown) {
+                                          TransportNodesInfo nodesInfo, TransportNodesShutdownAction nodesShutdown, TransportNodesRestartAction nodesRestart) {
         super(settings);
         this.clusterHealthAction = clusterHealthAction;
         this.clusterStateAction = clusterStateAction;
         this.nodesInfo = nodesInfo;
         this.nodesShutdown = nodesShutdown;
+        this.nodesRestart = nodesRestart;
         this.singlePingAction = singlePingAction;
         this.broadcastPingAction = broadcastPingAction;
         this.replicationPingAction = replicationPingAction;
@@ -135,4 +141,12 @@ public class NodeClusterAdminClient extends AbstractComponent implements Cluster
     @Override public void nodesShutdown(NodesShutdownRequest request, ActionListener<NodesShutdownResponse> listener) {
         nodesShutdown.execute(request, listener);
     }
+
+    @Override public ActionFuture<NodesRestartResponse> nodesRestart(NodesRestartRequest request) {
+        return nodesRestart.execute(request);
+    }
+
+    @Override public void nodesRestart(NodesRestartRequest request, ActionListener<NodesRestartResponse> listener) {
+        nodesRestart.execute(request, listener);
+    }
 }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java
index 87e3b9624e1..2d73b8b283c 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/ClientTransportActionModule.java
@@ -21,6 +21,7 @@ package org.elasticsearch.client.transport.action;
 
 import org.elasticsearch.client.transport.action.admin.cluster.health.ClientTransportClusterHealthAction;
 import org.elasticsearch.client.transport.action.admin.cluster.node.info.ClientTransportNodesInfoAction;
+import org.elasticsearch.client.transport.action.admin.cluster.node.restart.ClientTransportNodesRestartAction;
 import org.elasticsearch.client.transport.action.admin.cluster.node.shutdown.ClientTransportNodesShutdownAction;
 import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction;
 import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction;
@@ -74,6 +75,7 @@ public class ClientTransportActionModule extends AbstractModule {
 
         bind(ClientTransportNodesInfoAction.class).asEagerSingleton();
         bind(ClientTransportNodesShutdownAction.class).asEagerSingleton();
+        bind(ClientTransportNodesRestartAction.class).asEagerSingleton();
         bind(ClientTransportSinglePingAction.class).asEagerSingleton();
         bind(ClientTransportReplicationPingAction.class).asEagerSingleton();
         bind(ClientTransportBroadcastPingAction.class).asEagerSingleton();
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/node/restart/ClientTransportNodesRestartAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/node/restart/ClientTransportNodesRestartAction.java
new file mode 100644
index 00000000000..a9942038e9b
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/action/admin/cluster/node/restart/ClientTransportNodesRestartAction.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.client.transport.action.admin.cluster.node.restart;
+
+import org.elasticsearch.action.TransportActions;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse;
+import org.elasticsearch.client.transport.action.support.BaseClientTransportAction;
+import org.elasticsearch.transport.TransportService;
+import org.elasticsearch.util.guice.inject.Inject;
+import org.elasticsearch.util.settings.Settings;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class ClientTransportNodesRestartAction extends BaseClientTransportAction<NodesRestartRequest, NodesRestartResponse> {
+
+    @Inject public ClientTransportNodesRestartAction(Settings settings, TransportService transportService) {
+        super(settings, transportService, NodesRestartResponse.class);
+    }
+
+    @Override protected String action() {
+        return TransportActions.Admin.Cluster.Node.RESTART;
+    }
+}
\ No newline at end of file
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java
index a9ae9d5fea1..138c6ef4390 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/client/transport/support/InternalTransportClusterAdminClient.java
@@ -26,6 +26,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest;
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse;
 import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest;
 import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse;
 import org.elasticsearch.action.admin.cluster.ping.broadcast.BroadcastPingRequest;
@@ -40,6 +42,7 @@ import org.elasticsearch.client.ClusterAdminClient;
 import org.elasticsearch.client.transport.TransportClientNodesService;
 import org.elasticsearch.client.transport.action.admin.cluster.health.ClientTransportClusterHealthAction;
 import org.elasticsearch.client.transport.action.admin.cluster.node.info.ClientTransportNodesInfoAction;
+import org.elasticsearch.client.transport.action.admin.cluster.node.restart.ClientTransportNodesRestartAction;
 import org.elasticsearch.client.transport.action.admin.cluster.node.shutdown.ClientTransportNodesShutdownAction;
 import org.elasticsearch.client.transport.action.admin.cluster.ping.broadcast.ClientTransportBroadcastPingAction;
 import org.elasticsearch.client.transport.action.admin.cluster.ping.replication.ClientTransportReplicationPingAction;
@@ -71,16 +74,19 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple
 
     private final ClientTransportNodesShutdownAction nodesShutdownAction;
 
+    private final ClientTransportNodesRestartAction nodesRestartAction;
+
     @Inject public InternalTransportClusterAdminClient(Settings settings, TransportClientNodesService nodesService,
                                                        ClientTransportClusterHealthAction clusterHealthAction, ClientTransportClusterStateAction clusterStateAction,
                                                        ClientTransportSinglePingAction singlePingAction, ClientTransportReplicationPingAction replicationPingAction, ClientTransportBroadcastPingAction broadcastPingAction,
-                                                       ClientTransportNodesInfoAction nodesInfoAction, ClientTransportNodesShutdownAction nodesShutdownAction) {
+                                                       ClientTransportNodesInfoAction nodesInfoAction, ClientTransportNodesShutdownAction nodesShutdownAction, ClientTransportNodesRestartAction nodesRestartAction) {
         super(settings);
         this.nodesService = nodesService;
         this.clusterHealthAction = clusterHealthAction;
         this.clusterStateAction = clusterStateAction;
         this.nodesInfoAction = nodesInfoAction;
         this.nodesShutdownAction = nodesShutdownAction;
+        this.nodesRestartAction = nodesRestartAction;
         this.singlePingAction = singlePingAction;
         this.replicationPingAction = replicationPingAction;
         this.broadcastPingAction = broadcastPingAction;
@@ -204,4 +210,21 @@ public class InternalTransportClusterAdminClient extends AbstractComponent imple
             }
         });
     }
+
+    @Override public ActionFuture<NodesRestartResponse> nodesRestart(final NodesRestartRequest request) {
+        return nodesService.execute(new TransportClientNodesService.NodeCallback<org.elasticsearch.action.ActionFuture<org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse>>() {
+            @Override public ActionFuture<NodesRestartResponse> doWithNode(DiscoveryNode node) throws ElasticSearchException {
+                return nodesRestartAction.execute(node, request);
+            }
+        });
+    }
+
+    @Override public void nodesRestart(final NodesRestartRequest request, final ActionListener<NodesRestartResponse> listener) {
+        nodesService.execute(new TransportClientNodesService.NodeCallback<ActionFuture<Void>>() {
+            @Override public ActionFuture<Void> doWithNode(DiscoveryNode node) throws ElasticSearchException {
+                nodesRestartAction.execute(node, request, listener);
+                return null;
+            }
+        });
+    }
 }
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java
index 821e92efbe2..89e1bc9fa0f 100644
--- a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/RestActionModule.java
@@ -21,6 +21,7 @@ package org.elasticsearch.rest.action;
 
 import org.elasticsearch.rest.action.admin.cluster.health.RestClusterHealthAction;
 import org.elasticsearch.rest.action.admin.cluster.node.info.RestNodesInfoAction;
+import org.elasticsearch.rest.action.admin.cluster.node.restart.RestNodesRestartAction;
 import org.elasticsearch.rest.action.admin.cluster.node.shutdown.RestNodesShutdownAction;
 import org.elasticsearch.rest.action.admin.cluster.ping.broadcast.RestBroadcastPingAction;
 import org.elasticsearch.rest.action.admin.cluster.ping.replication.RestReplicationPingAction;
@@ -58,6 +59,7 @@ public class RestActionModule extends AbstractModule {
 
         bind(RestNodesInfoAction.class).asEagerSingleton();
         bind(RestNodesShutdownAction.class).asEagerSingleton();
+        bind(RestNodesRestartAction.class).asEagerSingleton();
         bind(RestClusterStateAction.class).asEagerSingleton();
         bind(RestClusterHealthAction.class).asEagerSingleton();
 
diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/restart/RestNodesRestartAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/restart/RestNodesRestartAction.java
new file mode 100644
index 00000000000..f006dc3ff45
--- /dev/null
+++ b/modules/elasticsearch/src/main/java/org/elasticsearch/rest/action/admin/cluster/node/restart/RestNodesRestartAction.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to Elastic Search and Shay Banon under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Elastic Search licenses this
+ * file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.elasticsearch.rest.action.admin.cluster.node.restart;
+
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest;
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.rest.*;
+import org.elasticsearch.rest.action.support.RestActions;
+import org.elasticsearch.util.guice.inject.Inject;
+import org.elasticsearch.util.settings.Settings;
+import org.elasticsearch.util.xcontent.builder.XContentBuilder;
+
+import java.io.IOException;
+
+import static org.elasticsearch.rest.action.support.RestXContentBuilder.*;
+
+/**
+ * @author kimchy (shay.banon)
+ */
+public class RestNodesRestartAction extends BaseRestHandler {
+
+    @Inject public RestNodesRestartAction(Settings settings, Client client, RestController controller) {
+        super(settings, client);
+
+        controller.registerHandler(RestRequest.Method.POST, "/_cluster/nodes/_restart", this);
+        controller.registerHandler(RestRequest.Method.POST, "/_cluster/nodes/{nodeId}/_restart", this);
+    }
+
+    @Override public void handleRequest(final RestRequest request, final RestChannel channel) {
+        String[] nodesIds = RestActions.splitNodes(request.param("nodeId"));
+        NodesRestartRequest nodesRestartRequest = new NodesRestartRequest(nodesIds);
+        nodesRestartRequest.listenerThreaded(false);
+        nodesRestartRequest.delay(request.paramAsTime("delay", nodesRestartRequest.delay()));
+        client.admin().cluster().nodesRestart(nodesRestartRequest, new ActionListener<NodesRestartResponse>() {
+            @Override public void onResponse(NodesRestartResponse result) {
+                try {
+                    XContentBuilder builder = restContentBuilder(request);
+                    builder.startObject();
+                    builder.field("cluster_name", result.clusterName().value());
+
+                    builder.startObject("nodes");
+                    for (NodesRestartResponse.NodeRestartResponse nodeInfo : result) {
+                        builder.startObject(nodeInfo.node().id());
+                        builder.field("name", nodeInfo.node().name());
+                        builder.endObject();
+                    }
+                    builder.endObject();
+
+                    builder.endObject();
+                    channel.sendResponse(new JsonRestResponse(request, RestResponse.Status.OK, builder));
+                } catch (Exception e) {
+                    onFailure(e);
+                }
+            }
+
+            @Override public void onFailure(Throwable e) {
+                try {
+                    channel.sendResponse(new JsonThrowableRestResponse(request, e));
+                } catch (IOException e1) {
+                    logger.error("Failed to send failure response", e1);
+                }
+            }
+        });
+    }
+}
\ No newline at end of file
diff --git a/plugins/client/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy b/plugins/client/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy
index bdcd4edb533..4bdc1abc58c 100644
--- a/plugins/client/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy
+++ b/plugins/client/groovy/src/main/groovy/org/elasticsearch/groovy/client/GClusterAdminClient.groovy
@@ -24,6 +24,8 @@ import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoRequest
 import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartRequest
+import org.elasticsearch.action.admin.cluster.node.restart.NodesRestartResponse
 import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownRequest
 import org.elasticsearch.action.admin.cluster.node.shutdown.NodesShutdownResponse
 import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest
@@ -109,7 +111,7 @@ class GClusterAdminClient {
         clusterAdminClient.nodesInfo(request, listener)
     }
 
-    // NODES INFO
+    // NODES SHUTDOWN
 
     GActionFuture<NodesShutdownResponse> nodesShutdown(Closure c) {
         NodesShutdownRequest request = new NodesShutdownRequest()
@@ -128,4 +130,24 @@ class GClusterAdminClient {
     void nodesShutdown(NodesShutdownRequest request, ActionListener<NodesShutdownResponse> listener) {
         clusterAdminClient.nodesShutdown(request, listener)
     }
+
+    // NODES RESTART
+
+    GActionFuture<NodesRestartResponse> nodesRestart(Closure c) {
+        NodesRestartRequest request = new NodesRestartRequest()
+        c.setDelegate request
+        c.resolveStrategy = gClient.resolveStrategy
+        c.call()
+        nodesRestart(request)
+    }
+
+    GActionFuture<NodesRestartResponse> nodesRestart(NodesRestartRequest request) {
+        GActionFuture<NodesRestartResponse> future = new GActionFuture<NodesRestartResponse>(internalClient.threadPool(), request);
+        clusterAdminClient.nodesRestart(request, future)
+        return future
+    }
+
+    void nodesRestart(NodesRestartRequest request, ActionListener<NodesRestartResponse> listener) {
+        clusterAdminClient.nodesRestart(request, listener)
+    }
 }