diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 5ea81f843e3..49dd021fc2b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -703,7 +703,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @VisibleForTesting @Private public void removeOrTrackCompletedContainersFromContext( - List containerIds) throws IOException { + List containerIds) { Set removedContainers = new HashSet(); pendingContainersToRemove.addAll(containerIds); @@ -720,13 +720,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements removedContainers.add(containerId); iter.remove(); } + pendingCompletedContainers.remove(containerId); } if (!removedContainers.isEmpty()) { LOG.info("Removed completed containers from NM context: " + removedContainers); } - pendingCompletedContainers.clear(); } private void trackAppsForKeepAlive(List appIds) { @@ -1301,6 +1301,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @SuppressWarnings("unchecked") public void run() { int lastHeartbeatID = 0; + boolean missedHearbeat = false; while (!isStopped) { // Send heartbeat try { @@ -1354,6 +1355,20 @@ public class NodeStatusUpdaterImpl extends AbstractService implements removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); + // If the last heartbeat was missed, it is possible that the + // RM saw this one as a duplicate and did not process it. + // If so, we can fail to notify the RM of these completed containers + // on the next heartbeat if we clear pendingCompletedContainers. + // If it wasn't a duplicate, the only impact is we might notify + // the RM twice, which it can handle. + if (!missedHearbeat) { + pendingCompletedContainers.clear(); + } else { + LOG.info("skipped clearing pending completed containers due to " + + "missed heartbeat"); + missedHearbeat = false; + } + logAggregationReportForAppsTempList.clear(); lastHeartbeatID = response.getResponseId(); List containersToCleanup = response @@ -1433,6 +1448,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); + missedHearbeat = true; } finally { synchronized (heartbeatMonitor) { nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 2477af25123..28344f9c947 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -721,15 +721,11 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase { } else if (heartBeatID == 2 || heartBeatID == 3) { List statuses = request.getNodeStatus().getContainersStatuses(); - if (heartBeatID == 2) { - // NM should send completed containers again, since the last - // heartbeat is lost. - Assert.assertEquals(4, statuses.size()); - } else { - // NM should not send completed containers again, since the last - // heartbeat is successful. - Assert.assertEquals(2, statuses.size()); - } + // NM should send completed containers on heartbeat 2, + // since heartbeat 1 was lost. It will send them again on + // heartbeat 3, because it does not clear them if the previous + // heartbeat was lost in case the RM treated it as a duplicate. + Assert.assertEquals(4, statuses.size()); Assert.assertEquals(4, context.getContainers().size()); boolean container2Exist = false, container3Exist = false, @@ -760,14 +756,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase { container5Exist = true; } } - if (heartBeatID == 2) { - Assert.assertTrue(container2Exist && container3Exist - && container4Exist && container5Exist); - } else { - // NM do not send completed containers again - Assert.assertTrue(container2Exist && !container3Exist - && container4Exist && !container5Exist); - } + Assert.assertTrue(container2Exist && container3Exist + && container4Exist && container5Exist); if (heartBeatID == 3) { finishedContainersPulledByAM.add(containerStatus3.getContainerId());