From 3e9200ddde4858be8ecdd8347b5fee63ed83df84 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 3 Apr 2013 16:57:07 +0000 Subject: [PATCH] YARN-101. Fix NodeManager heartbeat processing to not lose track of completed containers in case of dropped heartbeats. Contributed by Xuan Gong. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1464105 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../yarn/server/nodemanager/NodeManager.java | 6 +- .../nodemanager/NodeStatusUpdaterImpl.java | 16 +- .../nodemanager/TestNodeStatusUpdater.java | 242 ++++++++++++++++++ 4 files changed, 265 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 501509ac83a..f4c7a94a393 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -183,6 +183,9 @@ Release 2.0.5-beta - UNRELEASED local directory hits unix file count limits and thus prevent job failures. (Omkar Vinit Joshi via vinodkv) + YARN-101. Fix NodeManager heartbeat processing to not lose track of completed + containers in case of dropped heartbeats. (Xuan Gong via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 7a53eb9033a..867a02d2388 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -119,6 +119,10 @@ public class NodeManager extends CompositeService return new DeletionService(exec); } + protected NMContext createNMContext(NMContainerTokenSecretManager containerTokenSecretManager) { + return new NMContext(containerTokenSecretManager); + } + protected void doSecureLogin() throws IOException { SecurityUtil.login(getConfig(), YarnConfiguration.NM_KEYTAB, YarnConfiguration.NM_PRINCIPAL); @@ -137,7 +141,7 @@ public class NodeManager extends CompositeService containerTokenSecretManager = new NMContainerTokenSecretManager(conf); } - this.context = new NMContext(containerTokenSecretManager); + this.context = createNMContext(containerTokenSecretManager); this.aclsManager = new ApplicationACLsManager(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5265cf10ef1..8d424bf3449 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -88,6 +88,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; + private boolean previousHeartBeatSucceeded; + private List previousContainersStatuses = + new ArrayList(); + public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(NodeStatusUpdaterImpl.class.getName()); @@ -95,6 +99,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.context = context; this.dispatcher = dispatcher; this.metrics = metrics; + this.previousHeartBeatSucceeded = true; } @Override @@ -314,8 +319,14 @@ public class NodeStatusUpdaterImpl extends AbstractService implements NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class); nodeStatus.setNodeId(this.nodeId); - int numActiveContainers = 0; List containersStatuses = new ArrayList(); + if(previousHeartBeatSucceeded) { + previousContainersStatuses.clear(); + } else { + containersStatuses.addAll(previousContainersStatuses); + } + + int numActiveContainers = 0; for (Iterator> i = this.context.getContainers().entrySet().iterator(); i.hasNext();) { Entry e = i.next(); @@ -330,6 +341,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info("Sending out status for container: " + containerStatus); if (containerStatus.getState() == ContainerState.COMPLETE) { + previousContainersStatuses.add(containerStatus); // Remove i.remove(); @@ -404,6 +416,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } NodeHeartbeatResponse response = resourceTracker.nodeHeartbeat(request); + previousHeartBeatSucceeded = true; //get next heartbeat interval from response nextHeartBeatInterval = response.getNextHeartBeatInterval(); // See if the master-key has rolled over @@ -449,6 +462,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements new CMgrCompletedAppsEvent(appsToCleanup)); } } catch (Throwable e) { + previousHeartBeatSucceeded = false; // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 59b65e24d62..31b980973ec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.nodemanager; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.io.IOException; import java.net.InetSocketAddress; @@ -29,6 +30,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -43,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; @@ -58,11 +61,13 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequ import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; +import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.apache.hadoop.yarn.service.Service; @@ -92,6 +97,8 @@ public class TestNodeStatusUpdater { private final Configuration conf = createNMConfig(); private NodeManager nm; protected NodeManager rebootedNodeManager; + private boolean containerStatusBackupSuccessfully = true; + private List completedContainerStatusList = new ArrayList(); @After public void tearDown() { @@ -237,6 +244,22 @@ public class TestNodeStatusUpdater { } } + private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl { + public ResourceTracker resourceTracker; + + public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + resourceTracker = new MyResourceTracker4(context); + } + + @Override + protected ResourceTracker getRMClient() { + return resourceTracker; + } + + } + private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl { public ResourceTracker resourceTracker; private Context context; @@ -384,6 +407,104 @@ public class TestNodeStatusUpdater { } } + private class MyResourceTracker4 implements ResourceTracker { + + public NodeAction registerNodeAction = NodeAction.NORMAL; + public NodeAction heartBeatNodeAction = NodeAction.NORMAL; + private Context context; + + public MyResourceTracker4(Context context) { + this.context = context; + } + + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnRemoteException { + RegisterNodeManagerResponse response = recordFactory + .newRecordInstance(RegisterNodeManagerResponse.class); + response.setNodeAction(registerNodeAction); + return response; + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request) + throws YarnRemoteException { + try { + if (heartBeatID == 0) { + Assert.assertEquals(request.getNodeStatus().getContainersStatuses() + .size(), 0); + Assert.assertEquals(context.getContainers().size(), 0); + } else if (heartBeatID == 1) { + Assert.assertEquals(request.getNodeStatus().getContainersStatuses() + .size(), 5); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(0).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(0) + .getContainerId().getId() == 1); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(1).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(1) + .getContainerId().getId() == 2); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(2).getState() == ContainerState.COMPLETE + && request.getNodeStatus().getContainersStatuses().get(2) + .getContainerId().getId() == 3); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(3).getState() == ContainerState.COMPLETE + && request.getNodeStatus().getContainersStatuses().get(3) + .getContainerId().getId() == 4); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(4).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(4) + .getContainerId().getId() == 5); + throw new YarnException("Lost the heartbeat response"); + } else if (heartBeatID == 2) { + Assert.assertEquals(request.getNodeStatus().getContainersStatuses() + .size(), 7); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(0).getState() == ContainerState.COMPLETE + && request.getNodeStatus().getContainersStatuses().get(0) + .getContainerId().getId() == 3); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(1).getState() == ContainerState.COMPLETE + && request.getNodeStatus().getContainersStatuses().get(1) + .getContainerId().getId() == 4); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(2).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(2) + .getContainerId().getId() == 1); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(3).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(3) + .getContainerId().getId() == 2); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(4).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(4) + .getContainerId().getId() == 5); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(5).getState() == ContainerState.RUNNING + && request.getNodeStatus().getContainersStatuses().get(5) + .getContainerId().getId() == 6); + Assert.assertTrue(request.getNodeStatus().getContainersStatuses() + .get(6).getState() == ContainerState.COMPLETE + && request.getNodeStatus().getContainersStatuses().get(6) + .getContainerId().getId() == 7); + } + } catch (AssertionError error) { + LOG.info(error); + containerStatusBackupSuccessfully = false; + } finally { + heartBeatID++; + } + NodeStatus nodeStatus = request.getNodeStatus(); + nodeStatus.setResponseId(heartBeatID); + NodeHeartbeatResponse nhResponse = + YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, + heartBeatNodeAction, null, null, null, 1000L); + return nhResponse; + } + } + @Before public void clearError() { nmStartError = null; @@ -725,6 +846,127 @@ public class TestNodeStatusUpdater { } } + /** + * Test completed containerStatus get back up when heart beat lost + */ + @Test(timeout = 20000) + public void testCompletedContainerStatusBackup() throws Exception { + nm = new NodeManager() { + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + MyNodeStatusUpdater2 myNodeStatusUpdater = + new MyNodeStatusUpdater2(context, dispatcher, healthChecker, + metrics); + return myNodeStatusUpdater; + } + + @Override + protected NMContext createNMContext( + NMContainerTokenSecretManager containerTokenSecretManager) { + return new MyNMContext(containerTokenSecretManager); + } + + }; + + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + int waitCount = 0; + while (heartBeatID <= 3 && waitCount++ != 20) { + Thread.sleep(500); + } + if(!containerStatusBackupSuccessfully) { + Assert.fail("ContainerStatus Backup failed"); + } + nm.stop(); + } + + private class MyNMContext extends NMContext { + ConcurrentMap containers = + new ConcurrentSkipListMap(); + + public MyNMContext(NMContainerTokenSecretManager + containerTokenSecretManager) { + super(containerTokenSecretManager); + } + + @Override + public ConcurrentMap getContainers() { + if (heartBeatID == 0) { + return containers; + } else if (heartBeatID == 1) { + ContainerStatus containerStatus1 = + createContainerStatus(1, ContainerState.RUNNING); + Container container1 = getMockContainer(containerStatus1); + containers.put(containerStatus1.getContainerId(), container1); + + ContainerStatus containerStatus2 = + createContainerStatus(2, ContainerState.RUNNING); + Container container2 = getMockContainer(containerStatus2); + containers.put(containerStatus2.getContainerId(), container2); + + ContainerStatus containerStatus3 = + createContainerStatus(3, ContainerState.COMPLETE); + Container container3 = getMockContainer(containerStatus3); + containers.put(containerStatus3.getContainerId(), container3); + completedContainerStatusList.add(containerStatus3); + + ContainerStatus containerStatus4 = + createContainerStatus(4, ContainerState.COMPLETE); + Container container4 = getMockContainer(containerStatus4); + containers.put(containerStatus4.getContainerId(), container4); + completedContainerStatusList.add(containerStatus4); + + ContainerStatus containerStatus5 = + createContainerStatus(5, ContainerState.RUNNING); + Container container5 = getMockContainer(containerStatus5); + containers.put(containerStatus5.getContainerId(), container5); + + return containers; + } else if (heartBeatID == 2) { + ContainerStatus containerStatus6 = + createContainerStatus(6, ContainerState.RUNNING); + Container container6 = getMockContainer(containerStatus6); + containers.put(containerStatus6.getContainerId(), container6); + + ContainerStatus containerStatus7 = + createContainerStatus(7, ContainerState.COMPLETE); + Container container7 = getMockContainer(containerStatus7); + containers.put(containerStatus7.getContainerId(), container7); + completedContainerStatusList.add(containerStatus7); + + return containers; + } else { + containers.clear(); + + return containers; + } + } + + private ContainerStatus createContainerStatus(int id, + ContainerState containerState) { + ApplicationId applicationId = + BuilderUtils.newApplicationId(System.currentTimeMillis(), id); + ApplicationAttemptId applicationAttemptId = + BuilderUtils.newApplicationAttemptId(applicationId, id); + ContainerId contaierId = + BuilderUtils.newContainerId(applicationAttemptId, id); + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus(contaierId, containerState, + "test_containerStatus: id=" + id + ", containerState: " + + containerState, 0); + return containerStatus; + } + + private Container getMockContainer(ContainerStatus containerStatus) { + Container container = mock(Container.class); + when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus); + return container; + } + } + private void verifyNodeStartFailure(String errMessage) { YarnConfiguration conf = createNMConfig(); nm.init(conf);