diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index bf218b943a9..e2f75214242 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -275,6 +275,9 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3964. ResourceManager does not have JVM metrics (Jason Lowe via
bobby)
+ MAPREDUCE-3034. Ensure NodeManager reboots itself on direction from
+ ResourceManager. (Devaraj K & Eric Payne via acmurthy)
+
Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES
diff --git a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 799e20092d4..c22b351b89d 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -198,8 +198,12 @@
+
+
+
+
+
-
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 8f875e117ae..5e856e5c997 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -60,7 +60,8 @@ public class NodeManager extends CompositeService implements
private ApplicationACLsManager aclsManager;
private NodeHealthCheckerService nodeHealthChecker;
private LocalDirsHandlerService dirsHandler;
-
+ private static CompositeServiceShutdownHook nodeManagerShutdownHook;
+
public NodeManager() {
super(NodeManager.class.getName());
}
@@ -226,25 +227,52 @@ public NodeHealthCheckerService getNodeHealthChecker() {
@Override
public void stateChanged(Service service) {
- // Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
&& STATE.STOPPED.equals(service.getServiceState())) {
+
+ boolean hasToReboot = ((NodeStatusUpdaterImpl) service).hasToRebootNode();
+
+ // Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
stop();
+
+ // Reboot the whole node-manager if NodeStatusUpdater got a reboot command
+ // from the RM.
+ if (hasToReboot) {
+ LOG.info("Rebooting the node manager.");
+ NodeManager nodeManager = createNewNodeManager();
+ nodeManager.initAndStartNodeManager(hasToReboot);
+ }
}
}
- public static void main(String[] args) {
- StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
+ private void initAndStartNodeManager(boolean hasToReboot) {
try {
- NodeManager nodeManager = new NodeManager();
- Runtime.getRuntime().addShutdownHook(
- new CompositeServiceShutdownHook(nodeManager));
+
+ // Remove the old hook if we are rebooting.
+ if (hasToReboot && null != nodeManagerShutdownHook) {
+ Runtime.getRuntime().removeShutdownHook(nodeManagerShutdownHook);
+ }
+
+ nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
+ Runtime.getRuntime().addShutdownHook(nodeManagerShutdownHook);
+
YarnConfiguration conf = new YarnConfiguration();
- nodeManager.init(conf);
- nodeManager.start();
+ this.init(conf);
+ this.start();
} catch (Throwable t) {
LOG.fatal("Error starting NodeManager", t);
System.exit(-1);
}
}
+
+ // For testing
+ NodeManager createNewNodeManager() {
+ return new NodeManager();
+ }
+
+ public static void main(String[] args) {
+ StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
+ NodeManager nodeManager = new NodeManager();
+ nodeManager.initAndStartNodeManager(false);
+ }
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index f0007a657aa..6651c299aa5 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -91,6 +91,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
+ private boolean hasToRebootNode;
+
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
ContainerTokenSecretManager containerTokenSecretManager) {
@@ -156,6 +158,18 @@ public synchronized void stop() {
this.isStopped = true;
super.stop();
}
+
+ private synchronized void reboot() {
+ this.hasToRebootNode = true;
+ // Stop the status-updater. This will trigger a sub-service state change in
+ // the NodeManager which will then decide to reboot or not based on
+ // isRebooted.
+ this.stop();
+ }
+
+ synchronized boolean hasToRebootNode() {
+ return this.hasToRebootNode;
+ }
protected boolean isSecurityEnabled() {
return UserGroupInformation.isSecurityEnabled();
@@ -336,8 +350,8 @@ public void run() {
}
if (response.getNodeAction() == NodeAction.REBOOT) {
LOG.info("Node is out of sync with ResourceManager,"
- + " hence shutting down.");
- NodeStatusUpdaterImpl.this.stop();
+ + " hence rebooting.");
+ NodeStatusUpdaterImpl.this.reboot();
break;
}
diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index cfb32679a63..e19bc28939f 100644
--- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager;
+import static org.mockito.Mockito.mock;
+
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
@@ -71,7 +73,6 @@
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import static org.mockito.Mockito.mock;
public class TestNodeStatusUpdater {
@@ -91,6 +92,7 @@ public class TestNodeStatusUpdater {
private final List registeredNodes = new ArrayList();
private final Configuration conf = new YarnConfiguration();
private NodeManager nm;
+ protected NodeManager rebootedNodeManager;
@After
public void tearDown() {
@@ -496,8 +498,28 @@ public void testNodeReboot() throws Exception {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
-
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
+
+ waitCount = 0;
+ while (null == rebootedNodeManager && waitCount++ != 20) {
+ LOG.info("Waiting for NM to reinitialize..");
+ Thread.sleep(1000);
+ }
+
+ waitCount = 0;
+ while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) {
+ LOG.info("Waiting for NM to start..");
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState());
+
+ rebootedNodeManager.stop();
+ waitCount = 0;
+ while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
+ LOG.info("Waiting for NM to stop..");
+ Thread.sleep(1000);
+ }
+ Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState());
}
@Test
@@ -642,6 +664,12 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater;
}
+
+ @Override
+ NodeManager createNewNodeManager() {
+ rebootedNodeManager = getNodeManager(NodeAction.NORMAL);
+ return rebootedNodeManager;
+ }
};
}
}