YARN-495. Changed NM reboot behaviour to be a simple resync - kill all containers and re-register with RM. Contributed by Jian He.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1466752 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-04-11 02:00:47 +00:00
parent add00d6d74
commit 2e3b56f6e9
14 changed files with 230 additions and 157 deletions

View File

@ -135,6 +135,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-479. NM retry behavior for connection to RM should be similar for
lost heartbeats (Jian He via bikas)
YARN-495. Changed NM reboot behaviour to be a simple resync - kill all
containers and re-register with RM. (Jian He via vinodkv)
OPTIMIZATIONS
BUG FIXES

View File

@ -24,5 +24,5 @@ package org.apache.hadoop.yarn.server.api.records;
*/
public enum NodeAction {
NORMAL, REBOOT, SHUTDOWN
NORMAL, RESYNC, SHUTDOWN
}

View File

@ -25,7 +25,7 @@ import "yarn_protos.proto";
enum NodeActionProto {
NORMAL = 0;
REBOOT = 1;
RESYNC = 1;
SHUTDOWN = 2;
}

View File

@ -81,6 +81,7 @@ public class NodeManager extends CompositeService
private Context context;
private AsyncDispatcher dispatcher;
private ContainerManagerImpl containerManager;
private NodeStatusUpdater nodeStatusUpdater;
private static CompositeServiceShutdownHook nodeManagerShutdownHook;
private long waitForContainersOnShutdownMillis;
@ -163,7 +164,7 @@ public class NodeManager extends CompositeService
addService(nodeHealthChecker);
dirsHandler = nodeHealthChecker.getDiskHandler();
NodeStatusUpdater nodeStatusUpdater =
nodeStatusUpdater =
createNodeStatusUpdater(context, dispatcher, nodeHealthChecker);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
@ -214,35 +215,67 @@ public class NodeManager extends CompositeService
if (isStopping.getAndSet(true)) {
return;
}
cleanupContainers();
cleanupContainers(NodeManagerEventType.SHUTDOWN);
super.stop();
DefaultMetricsSystem.shutdown();
}
protected void cleanupContainersOnResync() {
//we do not want to block dispatcher thread here
new Thread() {
@Override
public void run() {
cleanupContainers(NodeManagerEventType.RESYNC);
((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater();
}
}.start();
}
@SuppressWarnings("unchecked")
protected void cleanupContainers() {
protected void cleanupContainers(NodeManagerEventType eventType) {
Map<ContainerId, Container> containers = context.getContainers();
if (containers.isEmpty()) {
return;
}
LOG.info("Containers still running on shutdown: " + containers.keySet());
LOG.info("Containers still running on " + eventType + " : "
+ containers.keySet());
List<ContainerId> containerIds = new ArrayList<ContainerId>(containers.keySet());
List<ContainerId> containerIds =
new ArrayList<ContainerId>(containers.keySet());
dispatcher.getEventHandler().handle(
new CMgrCompletedContainersEvent(containerIds,
CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN));
LOG.info("Waiting for containers to be killed");
long waitStartTime = System.currentTimeMillis();
while (!containers.isEmpty() &&
System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill", ex);
switch (eventType) {
case SHUTDOWN:
long waitStartTime = System.currentTimeMillis();
while (!containers.isEmpty()
&& System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) {
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on shutdown",
ex);
}
}
break;
case RESYNC:
while (!containers.isEmpty()) {
try {
Thread.sleep(1000);
//to remove done containers from the map
nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
} catch (InterruptedException ex) {
LOG.warn("Interrupted while sleeping on container kill on resync",
ex);
}
}
break;
default:
LOG.warn("Invalid eventType: " + eventType);
}
// All containers killed
@ -342,9 +375,8 @@ public class NodeManager extends CompositeService
case SHUTDOWN:
stop();
break;
case REBOOT:
stop();
reboot();
case RESYNC:
cleanupContainersOnResync();
break;
default:
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
@ -361,6 +393,11 @@ public class NodeManager extends CompositeService
return containerManager;
}
//For testing
Dispatcher getNMDispatcher(){
return dispatcher;
}
@VisibleForTesting
Context getNMContext() {
return this.context;

View File

@ -18,5 +18,5 @@
package org.apache.hadoop.yarn.server.nodemanager;
public enum NodeManagerEventType {
SHUTDOWN, REBOOT
SHUTDOWN, RESYNC
}

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.yarn.server.nodemanager;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.service.Service;
public interface NodeStatusUpdater extends Service {
void sendOutofBandHeartBeat();
NodeStatus getNodeStatusAndUpdateContainersInContext();
}

View File

@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.service.AbstractService;
import com.google.common.annotations.VisibleForTesting;
public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater {
@ -91,6 +93,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
private long rmConnectionRetryIntervalMS;
private boolean waitForEver;
private Runnable statusUpdaterRunnable;
private Thread statusUpdater;
public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(NodeStatusUpdaterImpl.class.getName());
@ -169,6 +174,22 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
this.isStopped = true;
super.stop();
}
protected void rebootNodeStatusUpdater() {
// Interrupt the updater.
this.isStopped = true;
try {
statusUpdater.join();
registerWithRM();
statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater");
this.isStopped = false;
statusUpdater.start();
LOG.info("NodeStatusUpdater thread is reRegistered and restarted");
} catch (Exception e) {
throw new AvroRuntimeException(e);
}
}
private boolean isSecurityEnabled() {
return UserGroupInformation.isSecurityEnabled();
@ -188,7 +209,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
conf);
}
private void registerWithRM() throws YarnRemoteException {
@VisibleForTesting
protected void registerWithRM() throws YarnRemoteException {
Configuration conf = getConfig();
rmConnectWaitMS =
conf.getInt(
@ -312,7 +334,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
return appList;
}
private NodeStatus getNodeStatus() {
public NodeStatus getNodeStatusAndUpdateContainersInContext() {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
nodeStatus.setNodeId(this.nodeId);
@ -387,7 +409,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
protected void startStatusUpdater() {
new Thread("Node Status Updater") {
statusUpdaterRunnable = new Runnable() {
@Override
@SuppressWarnings("unchecked")
public void run() {
@ -398,7 +420,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
NodeHeartbeatResponse response = null;
int rmRetryCount = 0;
long waitStartTime = System.currentTimeMillis();
NodeStatus nodeStatus = getNodeStatus();
NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
nodeStatus.setResponseId(lastHeartBeatID);
NodeHeartbeatRequest request = recordFactory
@ -453,11 +475,11 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
break;
}
if (response.getNodeAction() == NodeAction.REBOOT) {
if (response.getNodeAction() == NodeAction.RESYNC) {
LOG.info("Node is out of sync with ResourceManager,"
+ " hence rebooting.");
dispatcher.getEventHandler().handle(
new NodeManagerEvent(NodeManagerEventType.REBOOT));
new NodeManagerEvent(NodeManagerEventType.RESYNC));
break;
}
@ -500,6 +522,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
}
}
}
}.start();
};
statusUpdater =
new Thread(statusUpdaterRunnable, "Node Status Updater");
statusUpdater.start();
}
}

View File

@ -160,7 +160,10 @@ public class TestNodeManagerReboot {
"container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(),
ResourceLocalizationService.NM_PRIVATE_DIR) > 0);
nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT));
// restart the NodeManager
nm.stop();
nm = new MyNodeManager();
nm.start();
numTries = 0;
while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer
@ -250,26 +253,6 @@ public class TestNodeManagerReboot {
return delService;
}
// mimic part of reboot process
@Override
public void handle(NodeManagerEvent event) {
switch (event.getType()) {
case SHUTDOWN:
this.stop();
break;
case REBOOT:
this.stop();
this.createNewMyNodeManager().start();
break;
default:
LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring.");
}
}
private MyNodeManager createNewMyNodeManager() {
return new MyNodeManager();
}
private YarnConfiguration createNMConfig() {
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB

View File

@ -28,6 +28,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
import junit.framework.Assert;
@ -49,9 +52,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After;
import org.junit.Before;
@ -71,6 +77,7 @@ public class TestNodeManagerShutdown {
.getRecordFactory(null);
static final String user = "nobody";
private FileContext localFS;
private CyclicBarrier syncBarrier = new CyclicBarrier(2);
@Before
public void setup() throws UnsupportedFileSystemException {
@ -91,52 +98,7 @@ public class TestNodeManagerShutdown {
NodeManager nm = getNodeManager();
nm.init(createNMConfig());
nm.start();
ContainerManagerImpl containerManager = nm.getContainerManager();
File scriptFile = createUnhaltingScriptFile();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
// Construct the Container-id
ContainerId cId = createContainerId();
containerLaunchContext.setContainerId(cId);
containerLaunchContext.setUser(user);
URL localResourceUri =
ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource localResource =
recordFactory.newRecordInstance(LocalResource.class);
localResource.setResource(localResourceUri);
localResource.setSize(-1);
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
localResource.setType(LocalResourceType.FILE);
localResource.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, localResource);
containerLaunchContext.setLocalResources(localResources);
containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>();
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
containerLaunchContext.setResource(recordFactory
.newRecordInstance(Resource.class));
containerLaunchContext.getResource().setMemory(1024);
StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
containerManager.startContainer(startRequest);
GetContainerStatusRequest request =
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
request.setContainerId(cId);
ContainerStatus containerStatus =
containerManager.getContainerStatus(request).getStatus();
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
startContainers(nm);
final int MAX_TRIES=20;
int numTries = 0;
@ -170,6 +132,74 @@ public class TestNodeManagerShutdown {
reader.close();
}
@SuppressWarnings("unchecked")
@Test
public void testKillContainersOnResync() throws IOException, InterruptedException {
NodeManager nm = new TestNodeManager();
YarnConfiguration conf = createNMConfig();
nm.init(conf);
nm.start();
startContainers(nm);
assert ((TestNodeManager) nm).getNMRegistrationCount() == 1;
nm.getNMDispatcher().getEventHandler().
handle( new NodeManagerEvent(NodeManagerEventType.RESYNC));
try {
syncBarrier.await();
} catch (BrokenBarrierException e) {
}
assert ((TestNodeManager) nm).getNMRegistrationCount() == 2;
}
private void startContainers(NodeManager nm) throws IOException {
ContainerManagerImpl containerManager = nm.getContainerManager();
File scriptFile = createUnhaltingScriptFile();
ContainerLaunchContext containerLaunchContext =
recordFactory.newRecordInstance(ContainerLaunchContext.class);
// Construct the Container-id
ContainerId cId = createContainerId();
containerLaunchContext.setContainerId(cId);
containerLaunchContext.setUser(user);
URL localResourceUri =
ConverterUtils.getYarnUrlFromPath(localFS
.makeQualified(new Path(scriptFile.getAbsolutePath())));
LocalResource localResource =
recordFactory.newRecordInstance(LocalResource.class);
localResource.setResource(localResourceUri);
localResource.setSize(-1);
localResource.setVisibility(LocalResourceVisibility.APPLICATION);
localResource.setType(LocalResourceType.FILE);
localResource.setTimestamp(scriptFile.lastModified());
String destinationFile = "dest_file";
Map<String, LocalResource> localResources =
new HashMap<String, LocalResource>();
localResources.put(destinationFile, localResource);
containerLaunchContext.setLocalResources(localResources);
containerLaunchContext.setUser(containerLaunchContext.getUser());
List<String> commands = new ArrayList<String>();
commands.add("/bin/bash");
commands.add(scriptFile.getAbsolutePath());
containerLaunchContext.setCommands(commands);
containerLaunchContext.setResource(recordFactory
.newRecordInstance(Resource.class));
containerLaunchContext.getResource().setMemory(1024);
StartContainerRequest startRequest =
recordFactory.newRecordInstance(StartContainerRequest.class);
startRequest.setContainerLaunchContext(containerLaunchContext);
containerManager.startContainer(startRequest);
GetContainerStatusRequest request =
recordFactory.newRecordInstance(GetContainerStatusRequest.class);
request.setContainerId(cId);
ContainerStatus containerStatus =
containerManager.getContainerStatus(request).getStatus();
Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState());
}
private ContainerId createContainerId() {
ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class);
appId.setClusterTimestamp(0);
@ -226,4 +256,48 @@ public class TestNodeManagerShutdown {
}
};
}
class TestNodeManager extends NodeManager {
private int registrationCount = 0;
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
return new TestNodeStatusUpdaterImpl(context, dispatcher,
healthChecker, metrics);
}
public int getNMRegistrationCount() {
return registrationCount;
}
class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater {
public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
super(context, dispatcher, healthChecker, metrics);
}
@Override
protected void registerWithRM() throws YarnRemoteException {
super.registerWithRM();
registrationCount++;
}
@Override
protected void rebootNodeStatusUpdater() {
ConcurrentMap<ContainerId, Container> containers =
getNMContext().getContainers();
// ensure that containers are empty before restart nodeStatusUpdater
Assert.assertTrue(containers.isEmpty());
super.rebootNodeStatusUpdater();
try {
syncBarrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}
}
}

View File

@ -99,7 +99,6 @@ public class TestNodeStatusUpdater {
private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
private final Configuration conf = createNMConfig();
private NodeManager nm;
protected NodeManager rebootedNodeManager;
private boolean containerStatusBackupSuccessfully = true;
private List<ContainerStatus> completedContainerStatusList = new ArrayList<ContainerStatus>();
@ -663,8 +662,8 @@ public class TestNodeStatusUpdater {
}
@Override
protected void cleanupContainers() {
super.cleanupContainers();
protected void cleanupContainers(NodeManagerEventType eventType) {
super.cleanupContainers(NodeManagerEventType.SHUTDOWN);
numCleanups.incrementAndGet();
}
};
@ -717,50 +716,6 @@ public class TestNodeStatusUpdater {
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
}
@Test
public void testNodeReboot() throws Exception {
nm = getNodeManager(NodeAction.REBOOT);
YarnConfiguration conf = createNMConfig();
nm.init(conf);
Assert.assertEquals(STATE.INITED, nm.getServiceState());
nm.start();
int waitCount = 0;
while (heartBeatID < 1 && waitCount++ != 20) {
Thread.sleep(500);
}
Assert.assertFalse(heartBeatID < 1);
// NM takes a while to reach the STOPPED state.
waitCount = 0;
while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
waitCount = 0;
while (null == rebootedNodeManager && waitCount++ != 20) {
LOG.info("Waiting for NM to reinitialize..");
Thread.sleep(1000);
}
waitCount = 0;
while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) {
LOG.info("Waiting for NM to start..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState());
rebootedNodeManager.stop();
waitCount = 0;
while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
LOG.info("Waiting for NM to stop..");
Thread.sleep(1000);
}
Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState());
}
@Test
public void testNMShutdownForRegistrationFailure() {
@ -1108,12 +1063,6 @@ public class TestNodeStatusUpdater {
myNodeStatusUpdater.resourceTracker = myResourceTracker2;
return myNodeStatusUpdater;
}
@Override
NodeManager createNewNodeManager() {
rebootedNodeManager = getNodeManager(NodeAction.NORMAL);
return rebootedNodeManager;
}
};
}
}

View File

@ -73,13 +73,13 @@ public class ResourceTrackerService extends AbstractService implements
private Server server;
private InetSocketAddress resourceTrackerAddress;
private static final NodeHeartbeatResponse reboot = recordFactory
private static final NodeHeartbeatResponse resync = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
private static final NodeHeartbeatResponse shutDown = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
static {
reboot.setNodeAction(NodeAction.REBOOT);
resync.setNodeAction(NodeAction.RESYNC);
shutDown.setNodeAction(NodeAction.SHUTDOWN);
}
@ -220,7 +220,7 @@ public class ResourceTrackerService extends AbstractService implements
if (rmNode == null) {
/* node does not exist */
LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId());
return reboot;
return resync;
}
// Send ping
@ -250,7 +250,7 @@ public class ResourceTrackerService extends AbstractService implements
// TODO: Just sending reboot is not enough. Think more.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));
return reboot;
return resync;
}
// Heartbeat response

View File

@ -225,9 +225,9 @@ public class TestRMRestart {
// NM should be rebooted on heartbeat, even first heartbeat for nm2
NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
hbResponse = nm2.nodeHeartbeat(true);
Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction());
Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
// new NM to represent NM re-register
nm1 = rm2.registerNode("h1:1234", 15120);
@ -235,9 +235,9 @@ public class TestRMRestart {
// verify no more reboot response sent
hbResponse = nm1.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction());
hbResponse = nm2.nodeHeartbeat(true);
Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction());
Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction());
// assert app1 attempt is saved
attempt1 = loadedApp1.getCurrentAppAttempt();

View File

@ -282,7 +282,7 @@ public class TestResourceTrackerService {
nodeHeartbeat = nm2.nodeHeartbeat(
new HashMap<ApplicationId, List<ContainerStatus>>(), true, -100);
Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction()));
Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction()));
checkRebootedNMCount(rm, ++initialMetricCount);
}

View File

@ -130,6 +130,6 @@ public class TestRMNMRPCResponseId {
nodeStatus.setResponseId(0);
response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest);
Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction()));
Assert.assertTrue(NodeAction.RESYNC.equals(response.getNodeAction()));
}
}