MAPREDUCE-3034. Ensure NodeManager reboots itself on direction from ResourceManager. Contributed by Devaraj K & Eric Payne.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1297310 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2012-03-06 00:52:10 +00:00
parent 739f688f5a
commit b9fd9e1759
5 changed files with 91 additions and 14 deletions

View File

@ -275,6 +275,9 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-3964. ResourceManager does not have JVM metrics (Jason Lowe via MAPREDUCE-3964. ResourceManager does not have JVM metrics (Jason Lowe via
bobby) bobby)
MAPREDUCE-3034. Ensure NodeManager reboots itself on direction from
ResourceManager. (Devaraj K & Eric Payne via acmurthy)
Release 0.23.1 - 2012-02-17 Release 0.23.1 - 2012-02-17
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -198,7 +198,11 @@
<Method name="run" /> <Method name="run" />
<Bug pattern="DM_EXIT" /> <Bug pattern="DM_EXIT" />
</Match> </Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.nodemanager.NodeManager" />
<Method name="initAndStartNodeManager" />
<Bug pattern="DM_EXIT" />
</Match>
<!-- Ignore heartbeat exception when killing localizer --> <!-- Ignore heartbeat exception when killing localizer -->
<Match> <Match>

View File

@ -60,6 +60,7 @@ public class NodeManager extends CompositeService implements
private ApplicationACLsManager aclsManager; private ApplicationACLsManager aclsManager;
private NodeHealthCheckerService nodeHealthChecker; private NodeHealthCheckerService nodeHealthChecker;
private LocalDirsHandlerService dirsHandler; private LocalDirsHandlerService dirsHandler;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
public NodeManager() { public NodeManager() {
super(NodeManager.class.getName()); super(NodeManager.class.getName());
@ -226,25 +227,52 @@ public NodeHealthCheckerService getNodeHealthChecker() {
@Override @Override
public void stateChanged(Service service) { public void stateChanged(Service service) {
// Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
if (NodeStatusUpdaterImpl.class.getName().equals(service.getName()) if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
&& STATE.STOPPED.equals(service.getServiceState())) { && STATE.STOPPED.equals(service.getServiceState())) {
boolean hasToReboot = ((NodeStatusUpdaterImpl) service).hasToRebootNode();
// Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
stop(); stop();
// Reboot the whole node-manager if NodeStatusUpdater got a reboot command
// from the RM.
if (hasToReboot) {
LOG.info("Rebooting the node manager.");
NodeManager nodeManager = createNewNodeManager();
nodeManager.initAndStartNodeManager(hasToReboot);
}
} }
} }
public static void main(String[] args) { private void initAndStartNodeManager(boolean hasToReboot) {
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
try { try {
NodeManager nodeManager = new NodeManager();
Runtime.getRuntime().addShutdownHook( // Remove the old hook if we are rebooting.
new CompositeServiceShutdownHook(nodeManager)); if (hasToReboot && null != nodeManagerShutdownHook) {
Runtime.getRuntime().removeShutdownHook(nodeManagerShutdownHook);
}
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
Runtime.getRuntime().addShutdownHook(nodeManagerShutdownHook);
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
nodeManager.init(conf); this.init(conf);
nodeManager.start(); this.start();
} catch (Throwable t) { } catch (Throwable t) {
LOG.fatal("Error starting NodeManager", t); LOG.fatal("Error starting NodeManager", t);
System.exit(-1); System.exit(-1);
} }
} }
// For testing
NodeManager createNewNodeManager() {
return new NodeManager();
}
public static void main(String[] args) {
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
NodeManager nodeManager = new NodeManager();
nodeManager.initAndStartNodeManager(false);
}
} }

View File

@ -91,6 +91,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private final NodeHealthCheckerService healthChecker; private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics; private final NodeManagerMetrics metrics;
private boolean hasToRebootNode;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
ContainerTokenSecretManager containerTokenSecretManager) { ContainerTokenSecretManager containerTokenSecretManager) {
@ -157,6 +159,18 @@ public synchronized void stop() {
super.stop(); super.stop();
} }
private synchronized void reboot() {
this.hasToRebootNode = true;
// Stop the status-updater. This will trigger a sub-service state change in
// the NodeManager which will then decide to reboot or not based on
// isRebooted.
this.stop();
}
synchronized boolean hasToRebootNode() {
return this.hasToRebootNode;
}
protected boolean isSecurityEnabled() { protected boolean isSecurityEnabled() {
return UserGroupInformation.isSecurityEnabled(); return UserGroupInformation.isSecurityEnabled();
} }
@ -336,8 +350,8 @@ public void run() {
} }
if (response.getNodeAction() == NodeAction.REBOOT) { if (response.getNodeAction() == NodeAction.REBOOT) {
LOG.info("Node is out of sync with ResourceManager," LOG.info("Node is out of sync with ResourceManager,"
+ " hence shutting down."); + " hence rebooting.");
NodeStatusUpdaterImpl.this.stop(); NodeStatusUpdaterImpl.this.reboot();
break; break;
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.nodemanager; package org.apache.hadoop.yarn.server.nodemanager;
import static org.mockito.Mockito.mock;
import java.io.IOException; import java.io.IOException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
@ -71,7 +73,6 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.mockito.Mockito.mock;
public class TestNodeStatusUpdater { public class TestNodeStatusUpdater {
@ -91,6 +92,7 @@ public class TestNodeStatusUpdater {
private final List<NodeId> registeredNodes = new ArrayList<NodeId>(); private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
private final Configuration conf = new YarnConfiguration(); private final Configuration conf = new YarnConfiguration();
private NodeManager nm; private NodeManager nm;
protected NodeManager rebootedNodeManager;
@After @After
public void tearDown() { public void tearDown() {
@ -496,8 +498,28 @@ public void testNodeReboot() throws Exception {
LOG.info("Waiting for NM to stop.."); LOG.info("Waiting for NM to stop..");
Thread.sleep(1000); Thread.sleep(1000);
} }
Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
waitCount = 0;
while (null == rebootedNodeManager && waitCount++ != 20) {
LOG.info("Waiting for NM to reinitialize..");
Thread.sleep(1000);
}
waitCount = 0;
while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) {
LOG.info("Waiting for NM to start..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState());
rebootedNodeManager.stop();
waitCount = 0;
while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState());
} }
@Test @Test
@ -642,6 +664,12 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
myNodeStatusUpdater.resourceTracker = myResourceTracker2; myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater; return myNodeStatusUpdater;
} }
@Override
NodeManager createNewNodeManager() {
rebootedNodeManager = getNodeManager(NodeAction.NORMAL);
return rebootedNodeManager;
}
}; };
} }
} }