From 0aa44dc7ac7a98d99b78adf5cbdb9d1304c23d05 Mon Sep 17 00:00:00 2001 From: Bikas Saha Date: Mon, 8 Apr 2013 19:19:33 +0000 Subject: [PATCH] Merge -c 1465731 from trunk to branch-2 for YARN-479. NM retry behavior for connection to RM should be similar for lost heartbeats (Jian He via bikas) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1465733 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../nodemanager/NodeStatusUpdaterImpl.java | 66 ++++++++---- .../nodemanager/TestNodeStatusUpdater.java | 101 +++++++++++++++++- 3 files changed, 144 insertions(+), 26 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index aff677d4421..e7f0913008b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -68,6 +68,9 @@ Release 2.0.5-beta - UNRELEASED YARN-193. Scheduler.normalizeRequest does not account for allocation requests that exceed maximumAllocation limits (Zhijie Shen via bikas) + YARN-479. NM retry behavior for connection to RM should be similar for + lost heartbeats (Jian He via bikas) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8d424bf3449..cca296cd15d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -87,10 +87,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; - - private boolean previousHeartBeatSucceeded; - private List previousContainersStatuses = - new ArrayList(); + private long rmConnectWaitMS; + private long rmConnectionRetryIntervalMS; + private boolean waitForEver; public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { @@ -99,7 +98,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; - this.previousHeartBeatSucceeded = true; } @Override @@ -137,8 +135,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS); LOG.info("Initialized nodemanager for " + nodeId + ":" + - " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb + - " physical-cores=" + cpuCores + " virtual-cores=" + virtualCores); + " physical-memory=" + memoryMb + " virtual-memory=" + virtualMemoryMb + + " physical-cores=" + cpuCores + " virtual-cores=" + virtualCores); super.init(conf); } @@ -192,12 +190,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private void registerWithRM() throws YarnRemoteException { Configuration conf = getConfig(); - long rmConnectWaitMS = + rmConnectWaitMS = conf.getInt( YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, YarnConfiguration.DEFAULT_RESOURCEMANAGER_CONNECT_WAIT_SECS) * 1000; - long rmConnectionRetryIntervalMS = + rmConnectionRetryIntervalMS = conf.getLong( YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, YarnConfiguration @@ -210,7 +208,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements " should not be negative."); } - boolean waitForEver = (rmConnectWaitMS == -1000); + waitForEver = (rmConnectWaitMS == -1000); if(! waitForEver) { if(rmConnectWaitMS < 0) { @@ -319,14 +317,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); nodeStatus.setNodeId(this.nodeId); - List containersStatuses = new ArrayList(); - if(previousHeartBeatSucceeded) { - previousContainersStatuses.clear(); - } else { - containersStatuses.addAll(previousContainersStatuses); - } - int numActiveContainers = 0; + List containersStatuses = new ArrayList(); for (Iterator> i = this.context.getContainers().entrySet().iterator(); i.hasNext();) { Entry e = i.next(); @@ -341,7 +333,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info("Sending out status for container: " + containerStatus); if (containerStatus.getState() == ContainerState.COMPLETE) { - previousContainersStatuses.add(containerStatus); // Remove i.remove(); @@ -404,6 +395,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements while (!isStopped) { // Send heartbeat try { + NodeHeartbeatResponse response = null; + int rmRetryCount = 0; + long waitStartTime = System.currentTimeMillis(); NodeStatus nodeStatus = getNodeStatus(); nodeStatus.setResponseId(lastHeartBeatID); @@ -414,9 +408,31 @@ public class NodeStatusUpdaterImpl extends AbstractService implements request.setLastKnownMasterKey(NodeStatusUpdaterImpl.this.context .getContainerTokenSecretManager().getCurrentKey()); } - NodeHeartbeatResponse response = - resourceTracker.nodeHeartbeat(request); - previousHeartBeatSucceeded = true; + while (!isStopped) { + try { + rmRetryCount++; + response = resourceTracker.nodeHeartbeat(request); + break; + } catch (Throwable e) { + LOG.warn("Trying to heartbeat to ResourceManager, " + + "current no. of failed attempts is " + rmRetryCount); + if(System.currentTimeMillis() - waitStartTime < rmConnectWaitMS + || waitForEver) { + try { + LOG.info("Sleeping for " + rmConnectionRetryIntervalMS/1000 + + " seconds before next heartbeat to RM"); + Thread.sleep(rmConnectionRetryIntervalMS); + } catch(InterruptedException ex) { + //done nothing + } + } else { + String errorMessage = "Failed to heartbeat to RM, " + + "no. of failed attempts is "+rmRetryCount; + LOG.error(errorMessage,e); + throw new YarnException(errorMessage,e); + } + } + } //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); // See if the master-key has rolled over @@ -432,7 +448,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements if (response.getNodeAction() == NodeAction.SHUTDOWN) { LOG .info("Recieved SHUTDOWN signal from Resourcemanager as part of heartbeat," + - " hence shutting down."); + " hence shutting down."); dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); break; @@ -461,8 +477,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements dispatcher.getEventHandler().handle( new CMgrCompletedAppsEvent(appsToCleanup)); } + } catch (YarnException e) { + //catch and throw the exception if tried MAX wait time to connect RM + dispatcher.getEventHandler().handle( + new NodeManagerEvent(NodeManagerEventType.SHUTDOWN)); + throw e; } catch (Throwable e) { - previousHeartBeatSucceeded = false; // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 31b980973ec..b0f3093f510 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -51,9 +52,11 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRemoteException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.RPCUtil; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; @@ -167,6 +170,10 @@ public class TestNodeStatusUpdater { throws YarnRemoteException { NodeStatus nodeStatus = request.getNodeStatus(); LOG.info("Got heartbeat number " + heartBeatID); + NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class); + Dispatcher mockDispatcher = mock(Dispatcher.class); + EventHandler mockEventHandler = mock(EventHandler.class); + when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler); nodeStatus.setResponseId(heartBeatID++); Map> appToContainers = getAppToContainerStatusMap(nodeStatus.getContainersStatuses()); @@ -183,7 +190,8 @@ public class TestNodeStatusUpdater { launchContext.setContainerId(firstContainerID); launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); launchContext.getResource().setMemory(2); - Container container = new ContainerImpl(conf , null, launchContext, null, null); + Container container = new ContainerImpl(conf , mockDispatcher, + launchContext, null, mockMetrics); this.context.getContainers().put(firstContainerID, container); } else if (heartBeatID == 2) { // Checks on the RM end @@ -207,7 +215,8 @@ public class TestNodeStatusUpdater { launchContext.setContainerId(secondContainerID); launchContext.setResource(recordFactory.newRecordInstance(Resource.class)); launchContext.getResource().setMemory(3); - Container container = new ContainerImpl(conf, null, launchContext, null, null); + Container container = new ContainerImpl(conf, mockDispatcher, + launchContext, null, mockMetrics); this.context.getContainers().put(secondContainerID, container); } else if (heartBeatID == 3) { // Checks on the RM end @@ -229,13 +238,14 @@ public class TestNodeStatusUpdater { } private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl { - public ResourceTracker resourceTracker = new MyResourceTracker(this.context); + public ResourceTracker resourceTracker; private Context context; public MyNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(context, dispatcher, healthChecker, metrics); this.context = context; + resourceTracker = new MyResourceTracker(this.context); } @Override @@ -312,6 +322,21 @@ public class TestNodeStatusUpdater { } } + private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl { + private ResourceTracker resourceTracker; + + public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + resourceTracker = new MyResourceTracker5(); + } + + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + } + private class MyNodeManager extends NodeManager { private MyNodeStatusUpdater3 nodeStatusUpdater; @@ -328,6 +353,32 @@ public class TestNodeStatusUpdater { } } + private class MyNodeManager2 extends NodeManager { + public boolean isStopped = false; + private NodeStatusUpdater nodeStatusUpdater; + private CyclicBarrier syncBarrier; + public MyNodeManager2 (CyclicBarrier syncBarrier) { + this.syncBarrier = syncBarrier; + } + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + nodeStatusUpdater = + new MyNodeStatusUpdater5(context, dispatcher, healthChecker, + metrics); + return nodeStatusUpdater; + } + + @Override + public void stop() { + super.stop(); + isStopped = true; + try { + syncBarrier.await(); + } catch (Exception e) { + } + } + } // private class MyResourceTracker2 implements ResourceTracker { public NodeAction heartBeatNodeAction = NodeAction.NORMAL; @@ -505,6 +556,26 @@ public class TestNodeStatusUpdater { } } + private class MyResourceTracker5 implements ResourceTracker { + public NodeAction registerNodeAction = NodeAction.NORMAL; + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnRemoteException { + + RegisterNodeManagerResponse response = recordFactory + .newRecordInstance(RegisterNodeManagerResponse.class); + response.setNodeAction(registerNodeAction ); + return response; + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnRemoteException { + heartBeatID++; + throw RPCUtil.getRemoteException("NodeHeartbeat exception"); + } + } + @Before public void clearError() { nmStartError = null; @@ -883,6 +954,30 @@ public class TestNodeStatusUpdater { nm.stop(); } + @Test(timeout = 20000) + public void testNodeStatusUpdaterRetryAndNMShutdown() + throws InterruptedException { + final long connectionWaitSecs = 1; + final long connectionRetryIntervalSecs = 1; + YarnConfiguration conf = createNMConfig(); + conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_WAIT_SECS, + connectionWaitSecs); + conf.setLong(YarnConfiguration + .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_SECS, + connectionRetryIntervalSecs); + CyclicBarrier syncBarrier = new CyclicBarrier(2); + nm = new MyNodeManager2(syncBarrier); + nm.init(conf); + nm.start(); + try { + syncBarrier.await(); + } catch (Exception e) { + } + Assert.assertTrue(((MyNodeManager2) nm).isStopped); + Assert.assertTrue("calculate heartBeatCount based on" + + " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2); + } + private class MyNMContext extends NMContext { ConcurrentMap containers = new ConcurrentSkipListMap();