From b9fd9e17598c606d0acd54a68b4693f482ffb3ac Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Tue, 6 Mar 2012 00:52:10 +0000 Subject: [PATCH] MAPREDUCE-3034. Ensure NodeManager reboots itself on direction from ResourceManager. Contributed by Devaraj K & Eric Payne. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1297310 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../dev-support/findbugs-exclude.xml | 6 ++- .../yarn/server/nodemanager/NodeManager.java | 46 +++++++++++++++---- .../nodemanager/NodeStatusUpdaterImpl.java | 18 +++++++- .../nodemanager/TestNodeStatusUpdater.java | 32 ++++++++++++- 5 files changed, 91 insertions(+), 14 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index bf218b943a9..e2f75214242 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -275,6 +275,9 @@ Release 0.23.2 - UNRELEASED MAPREDUCE-3964. ResourceManager does not have JVM metrics (Jason Lowe via bobby) + MAPREDUCE-3034. Ensure NodeManager reboots itself on direction from + ResourceManager. (Devaraj K & Eric Payne via acmurthy) + Release 0.23.1 - 2012-02-17 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 799e20092d4..c22b351b89d 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -198,8 +198,12 @@ + + + + + - diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 8f875e117ae..5e856e5c997 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -60,7 +60,8 @@ public class NodeManager extends CompositeService implements private ApplicationACLsManager aclsManager; private NodeHealthCheckerService nodeHealthChecker; private LocalDirsHandlerService dirsHandler; - + private static CompositeServiceShutdownHook nodeManagerShutdownHook; + public NodeManager() { super(NodeManager.class.getName()); } @@ -226,25 +227,52 @@ public class NodeManager extends CompositeService implements @Override public void stateChanged(Service service) { - // Shutdown the Nodemanager when the NodeStatusUpdater is stopped. if (NodeStatusUpdaterImpl.class.getName().equals(service.getName()) && STATE.STOPPED.equals(service.getServiceState())) { + + boolean hasToReboot = ((NodeStatusUpdaterImpl) service).hasToRebootNode(); + + // Shutdown the Nodemanager when the NodeStatusUpdater is stopped. stop(); + + // Reboot the whole node-manager if NodeStatusUpdater got a reboot command + // from the RM. + if (hasToReboot) { + LOG.info("Rebooting the node manager."); + NodeManager nodeManager = createNewNodeManager(); + nodeManager.initAndStartNodeManager(hasToReboot); + } } } - public static void main(String[] args) { - StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); + private void initAndStartNodeManager(boolean hasToReboot) { try { - NodeManager nodeManager = new NodeManager(); - Runtime.getRuntime().addShutdownHook( - new CompositeServiceShutdownHook(nodeManager)); + + // Remove the old hook if we are rebooting. + if (hasToReboot && null != nodeManagerShutdownHook) { + Runtime.getRuntime().removeShutdownHook(nodeManagerShutdownHook); + } + + nodeManagerShutdownHook = new CompositeServiceShutdownHook(this); + Runtime.getRuntime().addShutdownHook(nodeManagerShutdownHook); + YarnConfiguration conf = new YarnConfiguration(); - nodeManager.init(conf); - nodeManager.start(); + this.init(conf); + this.start(); } catch (Throwable t) { LOG.fatal("Error starting NodeManager", t); System.exit(-1); } } + + // For testing + NodeManager createNewNodeManager() { + return new NodeManager(); + } + + public static void main(String[] args) { + StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); + NodeManager nodeManager = new NodeManager(); + nodeManager.initAndStartNodeManager(false); + } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index f0007a657aa..6651c299aa5 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -91,6 +91,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; + private boolean hasToRebootNode; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, ContainerTokenSecretManager containerTokenSecretManager) { @@ -156,6 +158,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.isStopped = true; super.stop(); } + + private synchronized void reboot() { + this.hasToRebootNode = true; + // Stop the status-updater. This will trigger a sub-service state change in + // the NodeManager which will then decide to reboot or not based on + // isRebooted. + this.stop(); + } + + synchronized boolean hasToRebootNode() { + return this.hasToRebootNode; + } protected boolean isSecurityEnabled() { return UserGroupInformation.isSecurityEnabled(); @@ -336,8 +350,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } if (response.getNodeAction() == NodeAction.REBOOT) { LOG.info("Node is out of sync with ResourceManager," - + " hence shutting down."); - NodeStatusUpdaterImpl.this.stop(); + + " hence rebooting."); + NodeStatusUpdaterImpl.this.reboot(); break; } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index cfb32679a63..e19bc28939f 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.nodemanager; +import static org.mockito.Mockito.mock; + import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; @@ -71,7 +73,6 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import static org.mockito.Mockito.mock; public class TestNodeStatusUpdater { @@ -91,6 +92,7 @@ public class TestNodeStatusUpdater { private final List registeredNodes = new ArrayList(); private final Configuration conf = new YarnConfiguration(); private NodeManager nm; + protected NodeManager rebootedNodeManager; @After public void tearDown() { @@ -496,8 +498,28 @@ public class TestNodeStatusUpdater { LOG.info("Waiting for NM to stop.."); Thread.sleep(1000); } - Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); + + waitCount = 0; + while (null == rebootedNodeManager && waitCount++ != 20) { + LOG.info("Waiting for NM to reinitialize.."); + Thread.sleep(1000); + } + + waitCount = 0; + while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) { + LOG.info("Waiting for NM to start.."); + Thread.sleep(1000); + } + Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState()); + + rebootedNodeManager.stop(); + waitCount = 0; + while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) { + LOG.info("Waiting for NM to stop.."); + Thread.sleep(1000); + } + Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState()); } @Test @@ -642,6 +664,12 @@ public class TestNodeStatusUpdater { myNodeStatusUpdater.resourceTracker = myResourceTracker2; return myNodeStatusUpdater; } + + @Override + NodeManager createNewNodeManager() { + rebootedNodeManager = getNodeManager(NodeAction.NORMAL); + return rebootedNodeManager; + } }; } }