Merge -c 1297310 from trunk to branch-0.23 to fix MAPREDUCE-3034. Ensure NodeManager reboots itself on direction from ResourceManager. Contributed by Devaraj K & Eric Payne.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1297311 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a7899dae53
commit
3d48d8cf6e
|
@ -191,6 +191,9 @@ Release 0.23.2 - UNRELEASED
|
||||||
MAPREDUCE-3964. ResourceManager does not have JVM metrics (Jason Lowe via
|
MAPREDUCE-3964. ResourceManager does not have JVM metrics (Jason Lowe via
|
||||||
bobby)
|
bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-3034. Ensure NodeManager reboots itself on direction from
|
||||||
|
ResourceManager. (Devaraj K & Eric Payne via acmurthy)
|
||||||
|
|
||||||
Release 0.23.1 - 2012-02-17
|
Release 0.23.1 - 2012-02-17
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
|
@ -198,7 +198,11 @@
|
||||||
<Method name="run" />
|
<Method name="run" />
|
||||||
<Bug pattern="DM_EXIT" />
|
<Bug pattern="DM_EXIT" />
|
||||||
</Match>
|
</Match>
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.yarn.server.nodemanager.NodeManager" />
|
||||||
|
<Method name="initAndStartNodeManager" />
|
||||||
|
<Bug pattern="DM_EXIT" />
|
||||||
|
</Match>
|
||||||
|
|
||||||
<!-- Ignore heartbeat exception when killing localizer -->
|
<!-- Ignore heartbeat exception when killing localizer -->
|
||||||
<Match>
|
<Match>
|
||||||
|
|
|
@ -60,6 +60,7 @@ public class NodeManager extends CompositeService implements
|
||||||
private ApplicationACLsManager aclsManager;
|
private ApplicationACLsManager aclsManager;
|
||||||
private NodeHealthCheckerService nodeHealthChecker;
|
private NodeHealthCheckerService nodeHealthChecker;
|
||||||
private LocalDirsHandlerService dirsHandler;
|
private LocalDirsHandlerService dirsHandler;
|
||||||
|
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
|
||||||
|
|
||||||
public NodeManager() {
|
public NodeManager() {
|
||||||
super(NodeManager.class.getName());
|
super(NodeManager.class.getName());
|
||||||
|
@ -226,25 +227,52 @@ public class NodeManager extends CompositeService implements
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void stateChanged(Service service) {
|
public void stateChanged(Service service) {
|
||||||
// Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
|
|
||||||
if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
|
if (NodeStatusUpdaterImpl.class.getName().equals(service.getName())
|
||||||
&& STATE.STOPPED.equals(service.getServiceState())) {
|
&& STATE.STOPPED.equals(service.getServiceState())) {
|
||||||
|
|
||||||
|
boolean hasToReboot = ((NodeStatusUpdaterImpl) service).hasToRebootNode();
|
||||||
|
|
||||||
|
// Shutdown the Nodemanager when the NodeStatusUpdater is stopped.
|
||||||
stop();
|
stop();
|
||||||
|
|
||||||
|
// Reboot the whole node-manager if NodeStatusUpdater got a reboot command
|
||||||
|
// from the RM.
|
||||||
|
if (hasToReboot) {
|
||||||
|
LOG.info("Rebooting the node manager.");
|
||||||
|
NodeManager nodeManager = createNewNodeManager();
|
||||||
|
nodeManager.initAndStartNodeManager(hasToReboot);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void main(String[] args) {
|
private void initAndStartNodeManager(boolean hasToReboot) {
|
||||||
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
|
|
||||||
try {
|
try {
|
||||||
NodeManager nodeManager = new NodeManager();
|
|
||||||
Runtime.getRuntime().addShutdownHook(
|
// Remove the old hook if we are rebooting.
|
||||||
new CompositeServiceShutdownHook(nodeManager));
|
if (hasToReboot && null != nodeManagerShutdownHook) {
|
||||||
|
Runtime.getRuntime().removeShutdownHook(nodeManagerShutdownHook);
|
||||||
|
}
|
||||||
|
|
||||||
|
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
|
||||||
|
Runtime.getRuntime().addShutdownHook(nodeManagerShutdownHook);
|
||||||
|
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
nodeManager.init(conf);
|
this.init(conf);
|
||||||
nodeManager.start();
|
this.start();
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.fatal("Error starting NodeManager", t);
|
LOG.fatal("Error starting NodeManager", t);
|
||||||
System.exit(-1);
|
System.exit(-1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For testing
|
||||||
|
NodeManager createNewNodeManager() {
|
||||||
|
return new NodeManager();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
|
||||||
|
NodeManager nodeManager = new NodeManager();
|
||||||
|
nodeManager.initAndStartNodeManager(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -91,6 +91,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
private final NodeHealthCheckerService healthChecker;
|
private final NodeHealthCheckerService healthChecker;
|
||||||
private final NodeManagerMetrics metrics;
|
private final NodeManagerMetrics metrics;
|
||||||
|
|
||||||
|
private boolean hasToRebootNode;
|
||||||
|
|
||||||
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
|
||||||
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
|
||||||
ContainerTokenSecretManager containerTokenSecretManager) {
|
ContainerTokenSecretManager containerTokenSecretManager) {
|
||||||
|
@ -157,6 +159,18 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
super.stop();
|
super.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized void reboot() {
|
||||||
|
this.hasToRebootNode = true;
|
||||||
|
// Stop the status-updater. This will trigger a sub-service state change in
|
||||||
|
// the NodeManager which will then decide to reboot or not based on
|
||||||
|
// isRebooted.
|
||||||
|
this.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
synchronized boolean hasToRebootNode() {
|
||||||
|
return this.hasToRebootNode;
|
||||||
|
}
|
||||||
|
|
||||||
protected boolean isSecurityEnabled() {
|
protected boolean isSecurityEnabled() {
|
||||||
return UserGroupInformation.isSecurityEnabled();
|
return UserGroupInformation.isSecurityEnabled();
|
||||||
}
|
}
|
||||||
|
@ -336,8 +350,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
}
|
}
|
||||||
if (response.getNodeAction() == NodeAction.REBOOT) {
|
if (response.getNodeAction() == NodeAction.REBOOT) {
|
||||||
LOG.info("Node is out of sync with ResourceManager,"
|
LOG.info("Node is out of sync with ResourceManager,"
|
||||||
+ " hence shutting down.");
|
+ " hence rebooting.");
|
||||||
NodeStatusUpdaterImpl.this.stop();
|
NodeStatusUpdaterImpl.this.reboot();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager;
|
package org.apache.hadoop.yarn.server.nodemanager;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
@ -71,7 +73,6 @@ import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
|
|
||||||
public class TestNodeStatusUpdater {
|
public class TestNodeStatusUpdater {
|
||||||
|
|
||||||
|
@ -91,6 +92,7 @@ public class TestNodeStatusUpdater {
|
||||||
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
|
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
|
||||||
private final Configuration conf = new YarnConfiguration();
|
private final Configuration conf = new YarnConfiguration();
|
||||||
private NodeManager nm;
|
private NodeManager nm;
|
||||||
|
protected NodeManager rebootedNodeManager;
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() {
|
public void tearDown() {
|
||||||
|
@ -496,8 +498,28 @@ public class TestNodeStatusUpdater {
|
||||||
LOG.info("Waiting for NM to stop..");
|
LOG.info("Waiting for NM to stop..");
|
||||||
Thread.sleep(1000);
|
Thread.sleep(1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
|
||||||
|
|
||||||
|
waitCount = 0;
|
||||||
|
while (null == rebootedNodeManager && waitCount++ != 20) {
|
||||||
|
LOG.info("Waiting for NM to reinitialize..");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
waitCount = 0;
|
||||||
|
while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) {
|
||||||
|
LOG.info("Waiting for NM to start..");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState());
|
||||||
|
|
||||||
|
rebootedNodeManager.stop();
|
||||||
|
waitCount = 0;
|
||||||
|
while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
|
||||||
|
LOG.info("Waiting for NM to stop..");
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -642,6 +664,12 @@ public class TestNodeStatusUpdater {
|
||||||
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
|
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
|
||||||
return myNodeStatusUpdater;
|
return myNodeStatusUpdater;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
NodeManager createNewNodeManager() {
|
||||||
|
rebootedNodeManager = getNodeManager(NodeAction.NORMAL);
|
||||||
|
return rebootedNodeManager;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue