From 3f8da2a9eb1436886de405f99161405eefa1daab Mon Sep 17 00:00:00 2001 From: Jian He Date: Thu, 8 Jan 2015 11:12:54 -0800 Subject: [PATCH] YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed container statuses on heartbeat. Contributed by Chengbing Liu (cherry picked from commit cc2a745f7e82c9fa6de03242952347c54c52dccc) (cherry picked from commit e7e6173049adca2a2ae0e1231adcaca8168bec27) (cherry picked from commit 3c4ed2497b14140f09b3cae4959be6474c4cdc99) --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../nodemanager/NodeStatusUpdaterImpl.java | 39 +++++++++----- .../nodemanager/TestNodeStatusUpdater.java | 52 ++++++++++++------- 3 files changed, 64 insertions(+), 30 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2d4af90a0e2..3a8911efa52 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -63,6 +63,9 @@ Release 2.6.1 - UNRELEASED YARN-2922. ConcurrentModificationException in CapacityScheduler's LeafQueue. (Rohith Sharmaks via ozawa) + YARN-2997. Fixed NodeStatusUpdater to not send alreay-sent completed + container statuses on heartbeat. (Chengbing Liu via jianhe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index f561dbb4760..6ddd7e4af5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -106,6 +106,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // the AM finishes it informs the RM to stop the may-be-already-completed // containers. private final Map recentlyStoppedContainers; + // Save the reported completed containers in case of lost heartbeat responses. + // These completed containers will be sent again till a successful response. + private final Map pendingCompletedContainers; // Duration for which to track recently stopped container. private long durationToTrackStoppedContainers; @@ -126,6 +129,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements this.metrics = metrics; this.recentlyStoppedContainers = new LinkedHashMap(); + this.pendingCompletedContainers = + new HashMap(); } @Override @@ -358,11 +363,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements List containerStatuses = new ArrayList(); for (Container container : this.context.getContainers().values()) { ContainerId containerId = container.getContainerId(); - ApplicationId applicationId = container.getContainerId() - .getApplicationAttemptId().getApplicationId(); + ApplicationId applicationId = containerId.getApplicationAttemptId() + .getApplicationId(); org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus = container.cloneAndGetContainerStatus(); - containerStatuses.add(containerStatus); if (containerStatus.getState() == ContainerState.COMPLETE) { if (isApplicationStopped(applicationId)) { if (LOG.isDebugEnabled()) { @@ -370,14 +374,21 @@ public class NodeStatusUpdaterImpl extends AbstractService implements + containerId + " from NM context."); } context.getContainers().remove(containerId); + pendingCompletedContainers.put(containerId, containerStatus); } else { - // Adding to finished containers cache. Cache will keep it around at - // least for #durationToTrackStoppedContainers duration. In the - // subsequent call to stop container it will get removed from cache. - addCompletedContainer(container.getContainerId()); + if (!isContainerRecentlyStopped(containerId)) { + pendingCompletedContainers.put(containerId, containerStatus); + // Adding to finished containers cache. Cache will keep it around at + // least for #durationToTrackStoppedContainers duration. In the + // subsequent call to stop container it will get removed from cache. + addCompletedContainer(containerId); + } } + } else { + containerStatuses.add(containerStatus); } } + containerStatuses.addAll(pendingCompletedContainers.values()); if (LOG.isDebugEnabled()) { LOG.debug("Sending out " + containerStatuses.size() + " container statuses: " + containerStatuses); @@ -397,8 +408,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements new ArrayList(); for (Container container : this.context.getContainers().values()) { ContainerId containerId = container.getContainerId(); - ApplicationId applicationId = container.getContainerId() - .getApplicationAttemptId().getApplicationId(); + ApplicationId applicationId = containerId.getApplicationAttemptId() + .getApplicationId(); if (!this.context.getApplications().containsKey(applicationId)) { context.getContainers().remove(containerId); continue; @@ -410,7 +421,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // Adding to finished containers cache. Cache will keep it around at // least for #durationToTrackStoppedContainers duration. In the // subsequent call to stop container it will get removed from cache. - addCompletedContainer(container.getContainerId()); + addCompletedContainer(containerId); } } LOG.info("Sending out " + containerStatuses.size() @@ -457,7 +468,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements ContainerId containerId = iter.next(); // remove the container only if the container is at DONE state Container nmContainer = context.getContainers().get(containerId); - if (nmContainer != null && nmContainer.getContainerState().equals( + if (nmContainer == null) { + iter.remove(); + } else if (nmContainer.getContainerState().equals( org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) { context.getContainers().remove(containerId); removedContainers.add(containerId); @@ -469,6 +482,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements LOG.info("Removed completed containers from NM context: " + removedContainers); } + pendingCompletedContainers.clear(); } private void trackAppsForKeepAlive(List appIds) { @@ -507,7 +521,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements recentlyStoppedContainers.clear(); } } - + @Private @VisibleForTesting public void removeVeryOldStoppedContainersFromCache() { @@ -605,6 +619,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements ResourceManagerConstants.RM_INVALID_IDENTIFIER; dispatcher.getEventHandler().handle( new NodeManagerEvent(NodeManagerEventType.RESYNC)); + pendingCompletedContainers.clear(); break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index e367085d8ee..46d7b101a63 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -610,14 +610,14 @@ public class TestNodeStatusUpdater { (); try { if (heartBeatID == 0) { - Assert.assertEquals(request.getNodeStatus().getContainersStatuses() - .size(), 0); - Assert.assertEquals(context.getContainers().size(), 0); + Assert.assertEquals(0, request.getNodeStatus().getContainersStatuses() + .size()); + Assert.assertEquals(0, context.getContainers().size()); } else if (heartBeatID == 1) { List statuses = request.getNodeStatus().getContainersStatuses(); - Assert.assertEquals(statuses.size(), 2); - Assert.assertEquals(context.getContainers().size(), 2); + Assert.assertEquals(2, statuses.size()); + Assert.assertEquals(2, context.getContainers().size()); boolean container2Exist = false, container3Exist = false; for (ContainerStatus status : statuses) { @@ -643,8 +643,16 @@ public class TestNodeStatusUpdater { } else if (heartBeatID == 2 || heartBeatID == 3) { List statuses = request.getNodeStatus().getContainersStatuses(); - Assert.assertEquals(statuses.size(), 4); - Assert.assertEquals(context.getContainers().size(), 4); + if (heartBeatID == 2) { + // NM should send completed containers again, since the last + // heartbeat is lost. + Assert.assertEquals(4, statuses.size()); + } else { + // NM should not send completed containers again, since the last + // heartbeat is successful. + Assert.assertEquals(2, statuses.size()); + } + Assert.assertEquals(4, context.getContainers().size()); boolean container2Exist = false, container3Exist = false, container4Exist = false, container5Exist = false; @@ -674,8 +682,14 @@ public class TestNodeStatusUpdater { container5Exist = true; } } - Assert.assertTrue(container2Exist && container3Exist - && container4Exist && container5Exist); + if (heartBeatID == 2) { + Assert.assertTrue(container2Exist && container3Exist + && container4Exist && container5Exist); + } else { + // NM do not send completed containers again + Assert.assertTrue(container2Exist && !container3Exist + && container4Exist && !container5Exist); + } if (heartBeatID == 3) { finishedContainersPulledByAM.add(containerStatus3.getContainerId()); @@ -683,8 +697,9 @@ public class TestNodeStatusUpdater { } else if (heartBeatID == 4) { List statuses = request.getNodeStatus().getContainersStatuses(); - Assert.assertEquals(statuses.size(), 3); - Assert.assertEquals(context.getContainers().size(), 3); + Assert.assertEquals(2, statuses.size()); + // Container 3 is acked by AM, hence removed from context + Assert.assertEquals(3, context.getContainers().size()); boolean container3Exist = false; for (ContainerStatus status : statuses) { @@ -917,13 +932,14 @@ public class TestNodeStatusUpdater { nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers); Set containerIdSet = new HashSet(); - for (ContainerStatus status : nodeStatusUpdater.getContainerStatuses()) { + List containerStatuses = nodeStatusUpdater.getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { containerIdSet.add(status.getContainerId()); } - Assert.assertTrue(nodeStatusUpdater.getContainerStatuses().size() == 1); + Assert.assertEquals(1, containerStatuses.size()); // completed container is removed; - Assert.assertFalse(containerIdSet.contains(anyCompletedContainer)); + Assert.assertFalse(containerIdSet.contains(cId)); // running container is not removed; Assert.assertTrue(containerIdSet.contains(runningContainerId)); } @@ -967,15 +983,15 @@ public class TestNodeStatusUpdater { when(application.getApplicationState()).thenReturn( ApplicationState.FINISHING_CONTAINERS_WAIT); - // The completed container will be sent one time. Then we will delete it. + // The completed container will be saved in case of lost heartbeat. + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); - Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size()); nm.getNMContext().getContainers().put(cId, anyCompletedContainer); nm.getNMContext().getApplications().remove(appId); - // The completed container will be sent one time. Then we will delete it. + // The completed container will be saved in case of lost heartbeat. + Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size()); - Assert.assertEquals(0, nodeStatusUpdater.getContainerStatuses().size()); } @Test