From 1c4047b0e46e95a92509de2e59a93433f5968538 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Fri, 7 Mar 2014 22:36:47 +0000 Subject: [PATCH] YARN-1783. Fixed a bug in NodeManager's status-updater that was losing completed container statuses when NodeManager is forced to resync by the ResourceManager. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1575437 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 6 +- .../yarn/server/nodemanager/NodeManager.java | 5 +- .../server/nodemanager/NodeStatusUpdater.java | 3 - .../nodemanager/NodeStatusUpdaterImpl.java | 107 +++++--- .../nodemanager/MockNodeStatusUpdater.java | 8 +- .../nodemanager/TestNodeManagerResync.java | 130 +++++++++- .../nodemanager/TestNodeStatusUpdater.java | 237 +++++++++--------- 7 files changed, 324 insertions(+), 172 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 9286a7ead82..75c90c9a35a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -411,11 +411,15 @@ Release 2.4.0 - UNRELEASED configuration-provider when booting up. (Xuan Gong via vinodkv) YARN-1768. Fixed error message being too verbose when killing a non-existent - application + application. (Tsuyoshi OZAWA via raviprak) YARN-1774. FS: Submitting to non-leaf queue throws NPE. (Anubhav Dhoot and Karthik Kambatla via kasha) + YARN-1783. Fixed a bug in NodeManager's status-updater that was losing + completed container statuses when NodeManager is forced to resync by the + ResourceManager. (Jian He via vinodkv) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 852c779d670..9688290acd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -229,7 +229,8 @@ public class NodeManager extends CompositeService containerManager.setBlockNewContainerRequests(true); LOG.info("Cleaning up running containers on resync"); containerManager.cleanupContainersOnNMResync(); - ((NodeStatusUpdaterImpl) nodeStatusUpdater).rebootNodeStatusUpdater(); + ((NodeStatusUpdaterImpl) nodeStatusUpdater) + .rebootNodeStatusUpdaterAndRegisterWithRM(); } catch (YarnRuntimeException e) { LOG.fatal("Error while rebooting NodeStatusUpdater.", e); shutDown(); @@ -243,7 +244,7 @@ public class NodeManager extends CompositeService private NodeId nodeId = null; private final ConcurrentMap applications = new ConcurrentHashMap(); - private final ConcurrentMap containers = + protected final ConcurrentMap containers = new ConcurrentSkipListMap(); private final NMContainerTokenSecretManager containerTokenSecretManager; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java index 69ac848db4a..2439450c9e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java @@ -20,14 +20,11 @@ package org.apache.hadoop.yarn.server.nodemanager; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.api.records.ContainerId; -import org.apache.hadoop.yarn.server.api.records.NodeStatus; public interface NodeStatusUpdater extends Service { void sendOutofBandHeartBeat(); - NodeStatus getNodeStatusAndUpdateContainersInContext(int responseId); - long getRMIdentifier(); public boolean isContainerRecentlyStopped(ContainerId containerId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index aaf6ceb02af..8293817a957 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -23,12 +23,14 @@ import java.net.ConnectException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Random; +import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -93,11 +95,19 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private Map appTokenKeepAliveMap = new HashMap(); private Random keepAliveDelayRandom = new Random(); - // It will be used to track recently stopped containers on node manager. + // It will be used to track recently stopped containers on node manager, this + // is to avoid the misleading no-such-container exception messages on NM, when + // the AM finishes it informs the RM to stop the may-be-already-completed + // containers. private final Map recentlyStoppedContainers; // Duration for which to track recently stopped container. private long durationToTrackStoppedContainers; + // This is used to track the current completed containers when nodeheartBeat + // is called. These completed containers will be removed from NM context after + // nodeHeartBeat succeeds and the response from the nodeHeartBeat is + // processed. + private final Set previousCompletedContainers; private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; @@ -114,6 +124,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.metrics = metrics; this.recentlyStoppedContainers = new LinkedHashMap(); + this.previousCompletedContainers = new HashSet(); } @Override @@ -194,7 +205,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements super.serviceStop(); } - protected void rebootNodeStatusUpdater() { + protected void rebootNodeStatusUpdaterAndRegisterWithRM() { // Interrupt the updater. this.isStopped = true; @@ -235,8 +246,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @VisibleForTesting protected void registerWithRM() throws YarnException, IOException { - List containerStatuses = - this.updateAndGetContainerStatuses(); + List containerStatuses = getContainerStatuses(); RegisterNodeManagerRequest request = RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource, nodeManagerVersionId, containerStatuses); @@ -321,62 +331,72 @@ public class NodeStatusUpdaterImpl extends AbstractService implements return appList; } - @Override - public NodeStatus getNodeStatusAndUpdateContainersInContext( - int responseId) { + private NodeStatus getNodeStatus(int responseId) { NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); - nodeHealthStatus.setLastHealthReportTime( - healthChecker.getLastHealthReportTime()); + nodeHealthStatus.setLastHealthReportTime(healthChecker + .getLastHealthReportTime()); if (LOG.isDebugEnabled()) { LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy() - + ", " + nodeHealthStatus.getHealthReport()); + + ", " + nodeHealthStatus.getHealthReport()); } - List containersStatuses = updateAndGetContainerStatuses(); - LOG.debug(this.nodeId + " sending out status for " - + containersStatuses.size() + " containers"); - NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, - containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus); + List containersStatuses = getContainerStatuses(); + if (LOG.isDebugEnabled()) { + LOG.debug(this.nodeId + " sending out status for " + + containersStatuses.size() + " containers"); + } + NodeStatus nodeStatus = + NodeStatus.newInstance(nodeId, responseId, containersStatuses, + createKeepAliveApplicationList(), nodeHealthStatus); return nodeStatus; } - /* - * It will return current container statuses. If any container has - * COMPLETED then it will be removed from context. - */ - private List updateAndGetContainerStatuses() { + // Iterate through the NMContext and clone and get all the containers' + // statuses. If it's a completed container, add into the + // recentlyStoppedContainers and previousCompletedContainers collections. + @VisibleForTesting + protected List getContainerStatuses() { List containerStatuses = new ArrayList(); - for (Iterator> i = - this.context.getContainers().entrySet().iterator(); i.hasNext();) { - Entry e = i.next(); - ContainerId containerId = e.getKey(); - Container container = e.getValue(); - - // Clone the container to send it to the RM - org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = + for (Container container : this.context.getContainers().values()) { + org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); containerStatuses.add(containerStatus); - if (LOG.isDebugEnabled()) { - LOG.debug("Sending out status for container: " + containerStatus); - } - - if (containerStatus.getState() == ContainerState.COMPLETE) { - // Remove - i.remove(); + if (containerStatus.getState().equals(ContainerState.COMPLETE)) { // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - addStoppedContainersToCache(containerId); - - LOG.info("Removed completed container " + containerId); + updateStoppedContainersInCache(container.getContainerId()); + addCompletedContainer(container); } } + if (LOG.isDebugEnabled()) { + LOG.debug("Sending out container statuses: " + containerStatuses); + } return containerStatuses; } + private void addCompletedContainer(Container container) { + synchronized (previousCompletedContainers) { + previousCompletedContainers.add(container.getContainerId()); + } + } + + private void removeCompletedContainersFromContext() { + synchronized (previousCompletedContainers) { + if (!previousCompletedContainers.isEmpty()) { + for (ContainerId containerId : previousCompletedContainers) { + this.context.getContainers().remove(containerId); + } + LOG.info("Removed completed containers from NM context: " + + previousCompletedContainers); + previousCompletedContainers.clear(); + } + } + } + private void trackAppsForKeepAlive(List appIds) { if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) { for (ApplicationId appId : appIds) { @@ -409,7 +429,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @Private @VisibleForTesting - public void addStoppedContainersToCache(ContainerId containerId) { + public void updateStoppedContainersInCache(ContainerId containerId) { synchronized (recentlyStoppedContainers) { removeVeryOldStoppedContainersFromCache(); recentlyStoppedContainers.put(containerId, @@ -457,8 +477,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // Send heartbeat try { NodeHeartbeatResponse response = null; - NodeStatus nodeStatus = - getNodeStatusAndUpdateContainersInContext(lastHeartBeatID); + NodeStatus nodeStatus = getNodeStatus(lastHeartBeatID); NodeHeartbeatRequest request = NodeHeartbeatRequest.newInstance(nodeStatus, @@ -494,6 +513,12 @@ public class NodeStatusUpdaterImpl extends AbstractService implements break; } + // Explicitly put this method after checking the resync response. We + // don't want to remove the completed containers before resync + // because these completed containers will be reported back to RM + // when NM re-registers with RM. + removeCompletedContainersFromContext(); + lastHeartBeatID = response.getResponseId(); List containersToCleanup = response .getContainersToCleanup(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java index a3e1faf310e..3f4091c4dc2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/MockNodeStatusUpdater.java @@ -54,7 +54,11 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl { public MockNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { super(context, dispatcher, healthChecker, metrics); - resourceTracker = new MockResourceTracker(); + resourceTracker = createResourceTracker(); + } + + protected ResourceTracker createResourceTracker() { + return new MockResourceTracker(); } @Override @@ -66,7 +70,7 @@ public class MockNodeStatusUpdater extends NodeStatusUpdaterImpl { return; } - private static class MockResourceTracker implements ResourceTracker { + protected static class MockResourceTracker implements ResourceTracker { private int heartBeatID; @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java index 2f9e0a625aa..d3c981d1201 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java @@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException; @@ -43,9 +45,17 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.api.ResourceTracker; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest; +import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse; +import org.apache.hadoop.yarn.server.api.records.NodeAction; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; +import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container; import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; +import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -162,6 +172,118 @@ public class TestNodeManagerResync { } Assert.assertTrue("NM shutdown not called.",isNMShutdownCalled.get()); + nm.stop(); + } + + + // This is to test when NM gets the resync response from last heart beat, it + // should be able to send the already-sent-via-last-heart-beat container + // statuses again when it re-register with RM. + @Test + public void testNMSentContainerStatusOnResync() throws Exception { + final ContainerStatus testCompleteContainer = + TestNodeStatusUpdater.createContainerStatus(2, ContainerState.COMPLETE); + final Container container = + TestNodeStatusUpdater.getMockContainer(testCompleteContainer); + NodeManager nm = new NodeManager() { + int registerCount = 0; + + @Override + protected NodeStatusUpdater createNodeStatusUpdater(Context context, + Dispatcher dispatcher, NodeHealthCheckerService healthChecker) { + return new TestNodeStatusUpdaterResync(context, dispatcher, + healthChecker, metrics) { + @Override + protected ResourceTracker createResourceTracker() { + return new MockResourceTracker() { + @Override + public RegisterNodeManagerResponse registerNodeManager( + RegisterNodeManagerRequest request) throws YarnException, + IOException { + if (registerCount == 0) { + // first register, no containers info. + try { + Assert.assertEquals(0, request.getContainerStatuses() + .size()); + } catch (AssertionError error) { + error.printStackTrace(); + assertionFailedInThread.set(true); + } + // put the completed container into the context + getNMContext().getContainers().put( + testCompleteContainer.getContainerId(), container); + } else { + // second register contains the completed container info. + List statuses = + request.getContainerStatuses(); + try { + Assert.assertEquals(1, statuses.size()); + Assert.assertEquals(testCompleteContainer.getContainerId(), + statuses.get(0).getContainerId()); + } catch (AssertionError error) { + error.printStackTrace(); + assertionFailedInThread.set(true); + } + } + registerCount++; + return super.registerNodeManager(request); + } + + @Override + public NodeHeartbeatResponse nodeHeartbeat( + NodeHeartbeatRequest request) { + // first heartBeat contains the completed container info + List statuses = + request.getNodeStatus().getContainersStatuses(); + try { + Assert.assertEquals(1, statuses.size()); + Assert.assertEquals(testCompleteContainer.getContainerId(), + statuses.get(0).getContainerId()); + } catch (AssertionError error) { + error.printStackTrace(); + assertionFailedInThread.set(true); + } + + // notify RESYNC on first heartbeat. + return YarnServerBuilderUtils.newNodeHeartbeatResponse(1, + NodeAction.RESYNC, null, null, null, null, 1000L); + } + }; + } + }; + } + }; + YarnConfiguration conf = createNMConfig(); + nm.init(conf); + nm.start(); + + try { + syncBarrier.await(); + } catch (BrokenBarrierException e) { + } + Assert.assertFalse(assertionFailedInThread.get()); + nm.stop(); + } + + // This can be used as a common base class for testing NM resync behavior. + class TestNodeStatusUpdaterResync extends MockNodeStatusUpdater { + public TestNodeStatusUpdaterResync(Context context, Dispatcher dispatcher, + NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) { + super(context, dispatcher, healthChecker, metrics); + } + @Override + protected void rebootNodeStatusUpdaterAndRegisterWithRM() { + try { + // Wait here so as to sync with the main test thread. + super.rebootNodeStatusUpdaterAndRegisterWithRM(); + syncBarrier.await(); + } catch (InterruptedException e) { + } catch (BrokenBarrierException e) { + } catch (AssertionError ae) { + ae.printStackTrace(); + assertionFailedInThread.set(true); + } + } } private YarnConfiguration createNMConfig() { @@ -206,14 +328,14 @@ public class TestNodeManagerResync { } @Override - protected void rebootNodeStatusUpdater() { + protected void rebootNodeStatusUpdaterAndRegisterWithRM() { ConcurrentMap containers = getNMContext().getContainers(); try { // ensure that containers are empty before restart nodeStatusUpdater Assert.assertTrue(containers.isEmpty()); - super.rebootNodeStatusUpdater(); + super.rebootNodeStatusUpdaterAndRegisterWithRM(); syncBarrier.await(); } catch (InterruptedException e) { } catch (BrokenBarrierException e) { @@ -278,7 +400,7 @@ public class TestNodeManagerResync { } @Override - protected void rebootNodeStatusUpdater() { + protected void rebootNodeStatusUpdaterAndRegisterWithRM() { ConcurrentMap containers = getNMContext().getContainers(); @@ -286,7 +408,7 @@ public class TestNodeManagerResync { try { // ensure that containers are empty before restart nodeStatusUpdater Assert.assertTrue(containers.isEmpty()); - super.rebootNodeStatusUpdater(); + super.rebootNodeStatusUpdaterAndRegisterWithRM(); // After this point new containers are free to be launched, except // containers from previous RM // Wait here so as to sync with the main test thread. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index f356a2ab839..76c544029c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -34,7 +34,6 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -117,8 +116,6 @@ public class TestNodeStatusUpdater { private boolean triggered = false; private Configuration conf; private NodeManager nm; - private boolean containerStatusBackupSuccessfully = true; - private List completedContainerStatusList = new ArrayList(); private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false); @Before @@ -304,6 +301,8 @@ public class TestNodeStatusUpdater { } } + // Test NodeStatusUpdater sends the right container statuses each time it + // heart beats. private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl { public ResourceTracker resourceTracker; @@ -555,6 +554,8 @@ public class TestNodeStatusUpdater { } } + // Test NodeStatusUpdater sends the right container statuses each time it + // heart beats. private class MyResourceTracker4 implements ResourceTracker { public NodeAction registerNodeAction = NodeAction.NORMAL; @@ -567,10 +568,9 @@ public class TestNodeStatusUpdater { @Override public RegisterNodeManagerResponse registerNodeManager( - RegisterNodeManagerRequest request) throws YarnException, - IOException { - RegisterNodeManagerResponse response = recordFactory - .newRecordInstance(RegisterNodeManagerResponse.class); + RegisterNodeManagerRequest request) throws YarnException, IOException { + RegisterNodeManagerResponse response = + recordFactory.newRecordInstance(RegisterNodeManagerResponse.class); response.setNodeAction(registerNodeAction); response.setContainerTokenMasterKey(createMasterKey()); response.setNMTokenMasterKey(createMasterKey()); @@ -583,67 +583,88 @@ public class TestNodeStatusUpdater { try { if (heartBeatID == 0) { Assert.assertEquals(request.getNodeStatus().getContainersStatuses() - .size(), 0); + .size(), 0); Assert.assertEquals(context.getContainers().size(), 0); } else if (heartBeatID == 1) { - Assert.assertEquals(request.getNodeStatus().getContainersStatuses() - .size(), 5); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(0).getState() == ContainerState.RUNNING - && request.getNodeStatus().getContainersStatuses().get(0) - .getContainerId().getId() == 1); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(1).getState() == ContainerState.RUNNING - && request.getNodeStatus().getContainersStatuses().get(1) - .getContainerId().getId() == 2); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(2).getState() == ContainerState.COMPLETE - && request.getNodeStatus().getContainersStatuses().get(2) - .getContainerId().getId() == 3); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(3).getState() == ContainerState.COMPLETE - && request.getNodeStatus().getContainersStatuses().get(3) - .getContainerId().getId() == 4); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(4).getState() == ContainerState.RUNNING - && request.getNodeStatus().getContainersStatuses().get(4) - .getContainerId().getId() == 5); - throw new java.net.ConnectException("Lost the heartbeat response"); + List statuses = + request.getNodeStatus().getContainersStatuses(); + Assert.assertEquals(statuses.size(), 2); + Assert.assertEquals(context.getContainers().size(), 2); + + ContainerStatus containerStatus2 = + createContainerStatus(2, ContainerState.RUNNING); + ContainerStatus containerStatus3 = + createContainerStatus(3, ContainerState.COMPLETE); + boolean container2Exist = false, container3Exist = false; + for (ContainerStatus status : statuses) { + if (status.getContainerId().equals( + containerStatus2.getContainerId())) { + Assert.assertTrue(status.getState().equals( + containerStatus2.getState())); + container2Exist = true; + } + if (status.getContainerId().equals( + containerStatus3.getContainerId())) { + Assert.assertTrue(status.getState().equals( + containerStatus3.getState())); + container3Exist = true; + } + } + Assert.assertTrue(container2Exist && container3Exist); + + // should throw exception that can be retried by the + // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the + // test passes. + throw new YarnRuntimeException("Lost the heartbeat response"); } else if (heartBeatID == 2) { - Assert.assertEquals(request.getNodeStatus().getContainersStatuses() - .size(), 7); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(0).getState() == ContainerState.COMPLETE - && request.getNodeStatus().getContainersStatuses().get(0) - .getContainerId().getId() == 3); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(1).getState() == ContainerState.COMPLETE - && request.getNodeStatus().getContainersStatuses().get(1) - .getContainerId().getId() == 4); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(2).getState() == ContainerState.RUNNING - && request.getNodeStatus().getContainersStatuses().get(2) - .getContainerId().getId() == 1); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(3).getState() == ContainerState.RUNNING - && request.getNodeStatus().getContainersStatuses().get(3) - .getContainerId().getId() == 2); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(4).getState() == ContainerState.RUNNING - && request.getNodeStatus().getContainersStatuses().get(4) - .getContainerId().getId() == 5); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(5).getState() == ContainerState.RUNNING - && request.getNodeStatus().getContainersStatuses().get(5) - .getContainerId().getId() == 6); - Assert.assertTrue(request.getNodeStatus().getContainersStatuses() - .get(6).getState() == ContainerState.COMPLETE - && request.getNodeStatus().getContainersStatuses().get(6) - .getContainerId().getId() == 7); + List statuses = + request.getNodeStatus().getContainersStatuses(); + Assert.assertEquals(statuses.size(), 4); + Assert.assertEquals(context.getContainers().size(), 4); + + ContainerStatus containerStatus2 = + createContainerStatus(2, ContainerState.RUNNING); + ContainerStatus containerStatus3 = + createContainerStatus(3, ContainerState.COMPLETE); + ContainerStatus containerStatus4 = + createContainerStatus(4, ContainerState.RUNNING); + ContainerStatus containerStatus5 = + createContainerStatus(5, ContainerState.COMPLETE); + + boolean container2Exist = false, container3Exist = false, container4Exist = + false, container5Exist = false; + for (ContainerStatus status : statuses) { + if (status.getContainerId().equals( + containerStatus2.getContainerId())) { + Assert.assertTrue(status.getState().equals( + containerStatus2.getState())); + container2Exist = true; + } + if (status.getContainerId().equals( + containerStatus3.getContainerId())) { + Assert.assertTrue(status.getState().equals( + containerStatus3.getState())); + container3Exist = true; + } + if (status.getContainerId().equals( + containerStatus4.getContainerId())) { + Assert.assertTrue(status.getState().equals( + containerStatus4.getState())); + container4Exist = true; + } + if (status.getContainerId().equals( + containerStatus5.getContainerId())) { + Assert.assertTrue(status.getState().equals( + containerStatus5.getState())); + container5Exist = true; + } + } + Assert.assertTrue(container2Exist && container3Exist + && container4Exist && container5Exist); } } catch (AssertionError error) { - LOG.info(error); - containerStatusBackupSuccessfully = false; + error.printStackTrace(); + assertionFailedInThread.set(true); } finally { heartBeatID++; } @@ -651,9 +672,7 @@ public class TestNodeStatusUpdater { nodeStatus.setResponseId(heartBeatID); NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID, - heartBeatNodeAction, - null, null, null, - null, 1000L); + heartBeatNodeAction, null, null, null, null, 1000L); return nhResponse; } } @@ -761,7 +780,7 @@ public class TestNodeStatusUpdater { ContainerId cId = ContainerId.newInstance(appAttemptId, 0); - nodeStatusUpdater.addStoppedContainersToCache(cId); + nodeStatusUpdater.updateStoppedContainersInCache(cId); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); long time1 = System.currentTimeMillis(); @@ -1119,7 +1138,8 @@ public class TestNodeStatusUpdater { } /** - * Test completed containerStatus get back up when heart beat lost + * Test completed containerStatus get back up when heart beat lost, and will + * be sent via next heart beat. */ @Test(timeout = 200000) public void testCompletedContainerStatusBackup() throws Exception { @@ -1150,7 +1170,7 @@ public class TestNodeStatusUpdater { while (heartBeatID <= 3 && waitCount++ != 20) { Thread.sleep(500); } - if(!containerStatusBackupSuccessfully) { + if(assertionFailedInThread.get()) { Assert.fail("ContainerStatus Backup failed"); } nm.stop(); @@ -1239,9 +1259,8 @@ public class TestNodeStatusUpdater { nm.stop(); } + // Add new containers info into NM context each time node heart beats. private class MyNMContext extends NMContext { - ConcurrentMap containers = - new ConcurrentSkipListMap(); public MyNMContext( NMContainerTokenSecretManager containerTokenSecretManager, @@ -1254,11 +1273,6 @@ public class TestNodeStatusUpdater { if (heartBeatID == 0) { return containers; } else if (heartBeatID == 1) { - ContainerStatus containerStatus1 = - createContainerStatus(1, ContainerState.RUNNING); - Container container1 = getMockContainer(containerStatus1); - containers.put(containerStatus1.getContainerId(), container1); - ContainerStatus containerStatus2 = createContainerStatus(2, ContainerState.RUNNING); Container container2 = getMockContainer(containerStatus2); @@ -1268,60 +1282,45 @@ public class TestNodeStatusUpdater { createContainerStatus(3, ContainerState.COMPLETE); Container container3 = getMockContainer(containerStatus3); containers.put(containerStatus3.getContainerId(), container3); - completedContainerStatusList.add(containerStatus3); - - ContainerStatus containerStatus4 = - createContainerStatus(4, ContainerState.COMPLETE); - Container container4 = getMockContainer(containerStatus4); - containers.put(containerStatus4.getContainerId(), container4); - completedContainerStatusList.add(containerStatus4); - - ContainerStatus containerStatus5 = - createContainerStatus(5, ContainerState.RUNNING); - Container container5 = getMockContainer(containerStatus5); - containers.put(containerStatus5.getContainerId(), container5); - return containers; } else if (heartBeatID == 2) { - ContainerStatus containerStatus6 = - createContainerStatus(6, ContainerState.RUNNING); - Container container6 = getMockContainer(containerStatus6); - containers.put(containerStatus6.getContainerId(), container6); - - ContainerStatus containerStatus7 = - createContainerStatus(7, ContainerState.COMPLETE); - Container container7 = getMockContainer(containerStatus7); - containers.put(containerStatus7.getContainerId(), container7); - completedContainerStatusList.add(containerStatus7); + ContainerStatus containerStatus4 = + createContainerStatus(4, ContainerState.RUNNING); + Container container4 = getMockContainer(containerStatus4); + containers.put(containerStatus4.getContainerId(), container4); + ContainerStatus containerStatus5 = + createContainerStatus(5, ContainerState.COMPLETE); + Container container5 = getMockContainer(containerStatus5); + containers.put(containerStatus5.getContainerId(), container5); return containers; } else { containers.clear(); - return containers; } } + } - private ContainerStatus createContainerStatus(int id, - ContainerState containerState) { - ApplicationId applicationId = - BuilderUtils.newApplicationId(System.currentTimeMillis(), id); - ApplicationAttemptId applicationAttemptId = - BuilderUtils.newApplicationAttemptId(applicationId, id); - ContainerId contaierId = - BuilderUtils.newContainerId(applicationAttemptId, id); - ContainerStatus containerStatus = - BuilderUtils.newContainerStatus(contaierId, containerState, - "test_containerStatus: id=" + id + ", containerState: " - + containerState, 0); - return containerStatus; - } + public static ContainerStatus createContainerStatus(int id, + ContainerState containerState) { + ApplicationId applicationId = ApplicationId.newInstance(0, 1); + ApplicationAttemptId applicationAttemptId = + ApplicationAttemptId.newInstance(applicationId, 1); + ContainerId contaierId = ContainerId.newInstance(applicationAttemptId, id); + ContainerStatus containerStatus = + BuilderUtils.newContainerStatus(contaierId, containerState, + "test_containerStatus: id=" + id + ", containerState: " + + containerState, 0); + return containerStatus; + } - private Container getMockContainer(ContainerStatus containerStatus) { - Container container = mock(Container.class); - when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus); - return container; - } + public static Container getMockContainer(ContainerStatus containerStatus) { + ContainerImpl container = mock(ContainerImpl.class); + when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus); + when(container.getCurrentState()).thenReturn(containerStatus.getState()); + when(container.getContainerId()).thenReturn( + containerStatus.getContainerId()); + return container; } private void verifyNodeStartFailure(String errMessage) throws Exception {