From 81a684ebd312b417bd1646cda7cd73d745d7d85e Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Thu, 11 Apr 2013 02:01:44 +0000 Subject: [PATCH] YARN-495. Changed NM reboot behaviour to be a simple resync - kill all containers and re-register with RM. Contributed by Jian He. svn merge --ignore-ancestry -c 1466752 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1466753 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/server/api/records/NodeAction.java | 2 +- .../proto/yarn_server_common_protos.proto | 2 +- .../yarn/server/nodemanager/NodeManager.java | 71 ++++++-- .../nodemanager/NodeManagerEventType.java | 2 +- .../server/nodemanager/NodeStatusUpdater.java | 2 + .../nodemanager/NodeStatusUpdaterImpl.java | 39 +++- .../nodemanager/TestNodeManagerReboot.java | 25 +-- .../nodemanager/TestNodeManagerShutdown.java | 166 +++++++++++++----- .../nodemanager/TestNodeStatusUpdater.java | 55 +----- .../ResourceTrackerService.java | 8 +- .../server/resourcemanager/TestRMRestart.java | 8 +- .../TestResourceTrackerService.java | 2 +- .../TestRMNMRPCResponseId.java | 2 +- 14 files changed, 230 insertions(+), 157 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0c92640c50f..a60b06ea032 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -71,6 +71,9 @@ Release 2.0.5-beta - UNRELEASED YARN-479. NM retry behavior for connection to RM should be similar for lost heartbeats (Jian He via bikas) + YARN-495. Changed NM reboot behaviour to be a simple resync - kill all + containers and re-register with RM. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java index 4d8246e7127..652c05fdbc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeAction.java @@ -24,5 +24,5 @@ */ public enum NodeAction { - NORMAL, REBOOT, SHUTDOWN + NORMAL, RESYNC, SHUTDOWN } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index 89ec81c3ab9..7fa1fb74030 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -25,7 +25,7 @@ import "yarn_protos.proto"; enum NodeActionProto { NORMAL = 0; - REBOOT = 1; + RESYNC = 1; SHUTDOWN = 2; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 867a02d2388..a5d16c568c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -81,6 +81,7 @@ public class NodeManager extends CompositeService private Context context; private AsyncDispatcher dispatcher; private ContainerManagerImpl containerManager; + private NodeStatusUpdater nodeStatusUpdater; private static CompositeServiceShutdownHook nodeManagerShutdownHook; private long waitForContainersOnShutdownMillis; @@ -163,7 +164,7 @@ public void init(Configuration conf) { addService(nodeHealthChecker); dirsHandler = nodeHealthChecker.getDiskHandler(); - NodeStatusUpdater nodeStatusUpdater = + nodeStatusUpdater = createNodeStatusUpdater(context, dispatcher, nodeHealthChecker); NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor(); @@ -214,35 +215,67 @@ public void stop() { if (isStopping.getAndSet(true)) { return; } - - cleanupContainers(); + + cleanupContainers(NodeManagerEventType.SHUTDOWN); super.stop(); DefaultMetricsSystem.shutdown(); } - + + protected void cleanupContainersOnResync() { + //we do not want to block dispatcher thread here + new Thread() { + @Override + public void run() { + cleanupContainers(NodeManagerEventType.RESYNC); + ((NodeStatusUpdaterImpl) nodeStatusUpdater ).rebootNodeStatusUpdater(); + } + }.start(); + } + @SuppressWarnings("unchecked") - protected void cleanupContainers() { + protected void cleanupContainers(NodeManagerEventType eventType) { Map containers = context.getContainers(); if (containers.isEmpty()) { return; } - LOG.info("Containers still running on shutdown: " + containers.keySet()); + LOG.info("Containers still running on " + eventType + " : " + + containers.keySet()); - List containerIds = new ArrayList(containers.keySet()); + List containerIds = + new ArrayList(containers.keySet()); dispatcher.getEventHandler().handle( new CMgrCompletedContainersEvent(containerIds, CMgrCompletedContainersEvent.Reason.ON_SHUTDOWN)); LOG.info("Waiting for containers to be killed"); - long waitStartTime = System.currentTimeMillis(); - while (!containers.isEmpty() && - System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) { - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - LOG.warn("Interrupted while sleeping on container kill", ex); + switch (eventType) { + case SHUTDOWN: + long waitStartTime = System.currentTimeMillis(); + while (!containers.isEmpty() + && System.currentTimeMillis() - waitStartTime < waitForContainersOnShutdownMillis) { + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + LOG.warn("Interrupted while sleeping on container kill on shutdown", + ex); + } } + break; + case RESYNC: + while (!containers.isEmpty()) { + try { + Thread.sleep(1000); + //to remove done containers from the map + nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext(); + } catch (InterruptedException ex) { + LOG.warn("Interrupted while sleeping on container kill on resync", + ex); + } + } + break; + default: + LOG.warn("Invalid eventType: " + eventType); } // All containers killed @@ -342,9 +375,8 @@ public void handle(NodeManagerEvent event) { case SHUTDOWN: stop(); break; - case REBOOT: - stop(); - reboot(); + case RESYNC: + cleanupContainersOnResync(); break; default: LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); @@ -361,6 +393,11 @@ ContainerManagerImpl getContainerManager() { return containerManager; } + //For testing + Dispatcher getNMDispatcher(){ + return dispatcher; + } + @VisibleForTesting Context getNMContext() { return this.context; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java index d18cec6c0fb..f4d1caad789 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManagerEventType.java @@ -18,5 +18,5 @@ package org.apache.hadoop.yarn.server.nodemanager; public enum NodeManagerEventType { - SHUTDOWN, REBOOT + SHUTDOWN, RESYNC } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index f1e6ac3bf4c..41949e7baab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -18,9 +18,11 @@ package org.apache.hadoop.yarn.server.nodemanager; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.service.Service; public interface NodeStatusUpdater extends Service { void sendOutofBandHeartBeat(); + NodeStatus getNodeStatusAndUpdateContainersInContext(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index cca296cd15d..e9583c2a2e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.service.AbstractService; +import com.google.common.annotations.VisibleForTesting; + public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatusUpdater { @@ -91,6 +93,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private long rmConnectionRetryIntervalMS; private boolean waitForEver; + private Runnable statusUpdaterRunnable; + private Thread statusUpdater; + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(NodeStatusUpdaterImpl.class.getName()); @@ -169,6 +174,22 @@ public synchronized void stop() { this.isStopped = true; super.stop(); } + + protected void rebootNodeStatusUpdater() { + // Interrupt the updater. + this.isStopped = true; + + try { + statusUpdater.join(); + registerWithRM(); + statusUpdater = new Thread(statusUpdaterRunnable, "Node Status Updater"); + this.isStopped = false; + statusUpdater.start(); + LOG.info("NodeStatusUpdater thread is reRegistered and restarted"); + } catch (Exception e) { + throw new AvroRuntimeException(e); + } + } private boolean isSecurityEnabled() { return UserGroupInformation.isSecurityEnabled(); @@ -188,7 +209,8 @@ protected ResourceTracker getRMClient() { conf); } - private void registerWithRM() throws YarnRemoteException { + @VisibleForTesting + protected void registerWithRM() throws YarnRemoteException { Configuration conf = getConfig(); rmConnectWaitMS = conf.getInt( @@ -312,7 +334,7 @@ private List createKeepAliveApplicationList() { return appList; } - private NodeStatus getNodeStatus() { + public NodeStatus getNodeStatusAndUpdateContainersInContext() { NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); nodeStatus.setNodeId(this.nodeId); @@ -387,7 +409,7 @@ public void sendOutofBandHeartBeat() { protected void startStatusUpdater() { - new Thread("Node Status Updater") { + statusUpdaterRunnable = new Runnable() { @Override @SuppressWarnings("unchecked") public void run() { @@ -398,7 +420,7 @@ public void run() { NodeHeartbeatResponse response = null; int rmRetryCount = 0; long waitStartTime = System.currentTimeMillis(); - NodeStatus nodeStatus = getNodeStatus(); + NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext(); nodeStatus.setResponseId(lastHeartBeatID); NodeHeartbeatRequest request = recordFactory @@ -453,11 +475,11 @@ public void run() { new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); break; } - if (response.getNodeAction() == NodeAction.REBOOT) { + if (response.getNodeAction() == NodeAction.RESYNC) { LOG.info("Node is out of sync with ResourceManager," + " hence rebooting."); dispatcher.getEventHandler().handle( - new NodeManagerEvent(NodeManagerEventType.REBOOT)); + new NodeManagerEvent(NodeManagerEventType.RESYNC)); break; } @@ -500,6 +522,9 @@ public void run() { } } } - }.start(); + }; + statusUpdater = + new Thread(statusUpdaterRunnable, "Node Status Updater"); + statusUpdater.start(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java index 10a85c74804..9ac237b6614 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java @@ -160,7 +160,10 @@ public void testClearLocalDirWhenNodeReboot() throws IOException { "container is launched", numOfLocalDirs(nmLocalDir.getAbsolutePath(), ResourceLocalizationService.NM_PRIVATE_DIR) > 0); - nm.handle(new NodeManagerEvent(NodeManagerEventType.REBOOT)); + // restart the NodeManager + nm.stop(); + nm = new MyNodeManager(); + nm.start(); numTries = 0; while ((numOfLocalDirs(nmLocalDir.getAbsolutePath(), ContainerLocalizer @@ -250,26 +253,6 @@ protected DeletionService createDeletionService(ContainerExecutor exec) { return delService; } - // mimic part of reboot process - @Override - public void handle(NodeManagerEvent event) { - switch (event.getType()) { - case SHUTDOWN: - this.stop(); - break; - case REBOOT: - this.stop(); - this.createNewMyNodeManager().start(); - break; - default: - LOG.warn("Invalid shutdown event " + event.getType() + ". Ignoring."); - } - } - - private MyNodeManager createNewMyNodeManager() { - return new MyNodeManager(); - } - private YarnConfiguration createNMConfig() { YarnConfiguration conf = new YarnConfiguration(); conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java index f42261765fb..72f3433c027 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java @@ -28,6 +28,9 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CyclicBarrier; import junit.framework.Assert; @@ -49,9 +52,12 @@ import org.apache.hadoop.yarn.api.records.URL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; +import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.util.ConverterUtils; import org.junit.After; import org.junit.Before; @@ -71,6 +77,7 @@ public class TestNodeManagerShutdown { .getRecordFactory(null); static final String user = "nobody"; private FileContext localFS; + private CyclicBarrier syncBarrier = new CyclicBarrier(2); @Before public void setup() throws UnsupportedFileSystemException { @@ -91,52 +98,7 @@ public void testKillContainersOnShutdown() throws IOException { NodeManager nm = getNodeManager(); nm.init(createNMConfig()); nm.start(); - - ContainerManagerImpl containerManager = nm.getContainerManager(); - File scriptFile = createUnhaltingScriptFile(); - - ContainerLaunchContext containerLaunchContext = - recordFactory.newRecordInstance(ContainerLaunchContext.class); - - // Construct the Container-id - ContainerId cId = createContainerId(); - containerLaunchContext.setContainerId(cId); - - containerLaunchContext.setUser(user); - - URL localResourceUri = - ConverterUtils.getYarnUrlFromPath(localFS - .makeQualified(new Path(scriptFile.getAbsolutePath()))); - LocalResource localResource = - recordFactory.newRecordInstance(LocalResource.class); - localResource.setResource(localResourceUri); - localResource.setSize(-1); - localResource.setVisibility(LocalResourceVisibility.APPLICATION); - localResource.setType(LocalResourceType.FILE); - localResource.setTimestamp(scriptFile.lastModified()); - String destinationFile = "dest_file"; - Map localResources = - new HashMap(); - localResources.put(destinationFile, localResource); - containerLaunchContext.setLocalResources(localResources); - containerLaunchContext.setUser(containerLaunchContext.getUser()); - List commands = new ArrayList(); - commands.add("/bin/bash"); - commands.add(scriptFile.getAbsolutePath()); - containerLaunchContext.setCommands(commands); - containerLaunchContext.setResource(recordFactory - .newRecordInstance(Resource.class)); - containerLaunchContext.getResource().setMemory(1024); - StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class); - startRequest.setContainerLaunchContext(containerLaunchContext); - containerManager.startContainer(startRequest); - - GetContainerStatusRequest request = - recordFactory.newRecordInstance(GetContainerStatusRequest.class); - request.setContainerId(cId); - ContainerStatus containerStatus = - containerManager.getContainerStatus(request).getStatus(); - Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState()); + startContainers(nm); final int MAX_TRIES=20; int numTries = 0; @@ -170,6 +132,74 @@ public void testKillContainersOnShutdown() throws IOException { reader.close(); } + @SuppressWarnings("unchecked") + @Test + public void testKillContainersOnResync() throws IOException, InterruptedException { + NodeManager nm = new TestNodeManager(); + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + startContainers(nm); + + assert ((TestNodeManager) nm).getNMRegistrationCount() == 1; + nm.getNMDispatcher().getEventHandler(). + handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + assert ((TestNodeManager) nm).getNMRegistrationCount() == 2; + } + + private void startContainers(NodeManager nm) throws IOException { + ContainerManagerImpl containerManager = nm.getContainerManager(); + File scriptFile = createUnhaltingScriptFile(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + // Construct the Container-id + ContainerId cId = createContainerId(); + containerLaunchContext.setContainerId(cId); + + containerLaunchContext.setUser(user); + + URL localResourceUri = + ConverterUtils.getYarnUrlFromPath(localFS + .makeQualified(new Path(scriptFile.getAbsolutePath()))); + LocalResource localResource = + recordFactory.newRecordInstance(LocalResource.class); + localResource.setResource(localResourceUri); + localResource.setSize(-1); + localResource.setVisibility(LocalResourceVisibility.APPLICATION); + localResource.setType(LocalResourceType.FILE); + localResource.setTimestamp(scriptFile.lastModified()); + String destinationFile = "dest_file"; + Map localResources = + new HashMap(); + localResources.put(destinationFile, localResource); + containerLaunchContext.setLocalResources(localResources); + containerLaunchContext.setUser(containerLaunchContext.getUser()); + List commands = new ArrayList(); + commands.add("/bin/bash"); + commands.add(scriptFile.getAbsolutePath()); + containerLaunchContext.setCommands(commands); + containerLaunchContext.setResource(recordFactory + .newRecordInstance(Resource.class)); + containerLaunchContext.getResource().setMemory(1024); + StartContainerRequest startRequest = + recordFactory.newRecordInstance(StartContainerRequest.class); + startRequest.setContainerLaunchContext(containerLaunchContext); + containerManager.startContainer(startRequest); + + GetContainerStatusRequest request = + recordFactory.newRecordInstance(GetContainerStatusRequest.class); + request.setContainerId(cId); + ContainerStatus containerStatus = + containerManager.getContainerStatus(request).getStatus(); + Assert.assertEquals(ContainerState.RUNNING, containerStatus.getState()); + } + private ContainerId createContainerId() { ApplicationId appId = recordFactory.newRecordInstance(ApplicationId.class); appId.setClusterTimestamp(0); @@ -226,4 +256,48 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } }; } + + class TestNodeManager extends NodeManager { + + private int registrationCount = 0; + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterImpl(context, dispatcher, + healthChecker, metrics); + } + + public int getNMRegistrationCount() { + return registrationCount; + } + + class TestNodeStatusUpdaterImpl extends MockNodeStatusUpdater { + + public TestNodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + + @Override + protected void registerWithRM() throws YarnRemoteException { + super.registerWithRM(); + registrationCount++; + } + + @Override + protected void rebootNodeStatusUpdater() { + ConcurrentMap containers = + getNMContext().getContainers(); + // ensure that containers are empty before restart nodeStatusUpdater + Assert.assertTrue(containers.isEmpty()); + super.rebootNodeStatusUpdater(); + try { + syncBarrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index b0f3093f510..c06a54dc118 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -99,7 +99,6 @@ public class TestNodeStatusUpdater { private final List registeredNodes = new ArrayList(); private final Configuration conf = createNMConfig(); private NodeManager nm; - protected NodeManager rebootedNodeManager; private boolean containerStatusBackupSuccessfully = true; private List completedContainerStatusList = new ArrayList(); @@ -663,8 +662,8 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, } @Override - protected void cleanupContainers() { - super.cleanupContainers(); + protected void cleanupContainers(NodeManagerEventType eventType) { + super.cleanupContainers(NodeManagerEventType.SHUTDOWN); numCleanups.incrementAndGet(); } }; @@ -717,50 +716,6 @@ public void testNodeDecommision() throws Exception { Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); } - @Test - public void testNodeReboot() throws Exception { - nm = getNodeManager(NodeAction.REBOOT); - YarnConfiguration conf = createNMConfig(); - nm.init(conf); - Assert.assertEquals(STATE.INITED, nm.getServiceState()); - nm.start(); - - int waitCount = 0; - while (heartBeatID < 1 && waitCount++ != 20) { - Thread.sleep(500); - } - Assert.assertFalse(heartBeatID < 1); - - // NM takes a while to reach the STOPPED state. - waitCount = 0; - while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) { - LOG.info("Waiting for NM to stop.."); - Thread.sleep(1000); - } - Assert.assertEquals(STATE.STOPPED, nm.getServiceState()); - - waitCount = 0; - while (null == rebootedNodeManager && waitCount++ != 20) { - LOG.info("Waiting for NM to reinitialize.."); - Thread.sleep(1000); - } - - waitCount = 0; - while (rebootedNodeManager.getServiceState() != STATE.STARTED && waitCount++ != 20) { - LOG.info("Waiting for NM to start.."); - Thread.sleep(1000); - } - Assert.assertEquals(STATE.STARTED, rebootedNodeManager.getServiceState()); - - rebootedNodeManager.stop(); - waitCount = 0; - while (rebootedNodeManager.getServiceState() != STATE.STOPPED && waitCount++ != 20) { - LOG.info("Waiting for NM to stop.."); - Thread.sleep(1000); - } - Assert.assertEquals(STATE.STOPPED, rebootedNodeManager.getServiceState()); - } - @Test public void testNMShutdownForRegistrationFailure() { @@ -1108,12 +1063,6 @@ protected NodeStatusUpdater createNodeStatusUpdater(Context context, myNodeStatusUpdater.resourceTracker = myResourceTracker2; return myNodeStatusUpdater; } - - @Override - NodeManager createNewNodeManager() { - rebootedNodeManager = getNodeManager(NodeAction.NORMAL); - return rebootedNodeManager; - } }; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java index 27f74d4f740..258c7dc0e47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java @@ -73,13 +73,13 @@ public class ResourceTrackerService extends AbstractService implements private Server server; private InetSocketAddress resourceTrackerAddress; - private static final NodeHeartbeatResponse reboot = recordFactory + private static final NodeHeartbeatResponse resync = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); private static final NodeHeartbeatResponse shutDown = recordFactory .newRecordInstance(NodeHeartbeatResponse.class); static { - reboot.setNodeAction(NodeAction.REBOOT); + resync.setNodeAction(NodeAction.RESYNC); shutDown.setNodeAction(NodeAction.SHUTDOWN); } @@ -220,7 +220,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) if (rmNode == null) { /* node does not exist */ LOG.info("Node not found rebooting " + remoteNodeStatus.getNodeId()); - return reboot; + return resync; } // Send ping @@ -250,7 +250,7 @@ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) // TODO: Just sending reboot is not enough. Think more. this.rmContext.getDispatcher().getEventHandler().handle( new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING)); - return reboot; + return resync; } // Heartbeat response diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java index d19879c0a3b..78adf79eba0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java @@ -225,9 +225,9 @@ public void testRMRestart() throws Exception { // NM should be rebooted on heartbeat, even first heartbeat for nm2 NodeHeartbeatResponse hbResponse = nm1.nodeHeartbeat(true); - Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction()); + Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); hbResponse = nm2.nodeHeartbeat(true); - Assert.assertEquals(NodeAction.REBOOT, hbResponse.getNodeAction()); + Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction()); // new NM to represent NM re-register nm1 = rm2.registerNode("h1:1234", 15120); @@ -235,9 +235,9 @@ public void testRMRestart() throws Exception { // verify no more reboot response sent hbResponse = nm1.nodeHeartbeat(true); - Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction()); + Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction()); hbResponse = nm2.nodeHeartbeat(true); - Assert.assertTrue(NodeAction.REBOOT != hbResponse.getNodeAction()); + Assert.assertTrue(NodeAction.RESYNC != hbResponse.getNodeAction()); // assert app1 attempt is saved attempt1 = loadedApp1.getCurrentAppAttempt(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 61ed065fb61..af9d5d2c0bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -282,7 +282,7 @@ public void testReboot() throws Exception { nodeHeartbeat = nm2.nodeHeartbeat( new HashMap>(), true, -100); - Assert.assertTrue(NodeAction.REBOOT.equals(nodeHeartbeat.getNodeAction())); + Assert.assertTrue(NodeAction.RESYNC.equals(nodeHeartbeat.getNodeAction())); checkRebootedNMCount(rm, ++initialMetricCount); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java index 984d7cdfcf5..1fd1b2c9d36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/resourcetracker/TestRMNMRPCResponseId.java @@ -130,6 +130,6 @@ public void testRPCResponseId() throws IOException { nodeStatus.setResponseId(0); response = resourceTrackerService.nodeHeartbeat(nodeHeartBeatRequest); - Assert.assertTrue(NodeAction.REBOOT.equals(response.getNodeAction())); + Assert.assertTrue(NodeAction.RESYNC.equals(response.getNodeAction())); } } \ No newline at end of file