From b5e6ae97f46ecbec8bae927526370493ab30050f Mon Sep 17 00:00:00 2001 From: Adam Antal Date: Mon, 5 Oct 2020 10:09:14 +0200 Subject: [PATCH] YARN-10393. MR job live lock caused by completed state container leak in heartbeat between node manager and RM. Contributed by zhenzhao wang and Jim Brennan (cherry picked from commit a1f7e760dffeeffb9cc739f734c0a91b81a0c9d0) --- .../nodemanager/NodeStatusUpdaterImpl.java | 20 ++++++++++++++-- .../nodemanager/TestNodeStatusUpdater.java | 24 ++++++------------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 8ed55028138..9f0551895d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -682,7 +682,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @VisibleForTesting @Private public void removeOrTrackCompletedContainersFromContext( - List containerIds) throws IOException { + List containerIds) { Set removedContainers = new HashSet(); pendingContainersToRemove.addAll(containerIds); @@ -699,13 +699,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements removedContainers.add(containerId); iter.remove(); } + pendingCompletedContainers.remove(containerId); } if (!removedContainers.isEmpty()) { LOG.info("Removed completed containers from NM context: " + removedContainers); } - pendingCompletedContainers.clear(); } private void trackAppsForKeepAlive(List appIds) { @@ -1077,6 +1077,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements @SuppressWarnings("unchecked") public void run() { int lastHeartbeatID = 0; + boolean missedHearbeat = false; while (!isStopped) { // Send heartbeat try { @@ -1123,6 +1124,20 @@ public class NodeStatusUpdaterImpl extends AbstractService implements removeOrTrackCompletedContainersFromContext(response .getContainersToBeRemovedFromNM()); + // If the last heartbeat was missed, it is possible that the + // RM saw this one as a duplicate and did not process it. + // If so, we can fail to notify the RM of these completed containers + // on the next heartbeat if we clear pendingCompletedContainers. + // If it wasn't a duplicate, the only impact is we might notify + // the RM twice, which it can handle. + if (!missedHearbeat) { + pendingCompletedContainers.clear(); + } else { + LOG.info("skipped clearing pending completed containers due to " + + "missed heartbeat"); + missedHearbeat = false; + } + logAggregationReportForAppsTempList.clear(); lastHeartbeatID = response.getResponseId(); List containersToCleanup = response @@ -1199,6 +1214,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements // TODO Better error handling. Thread can die with the rest of the // NM still running. LOG.error("Caught exception in status-updater", e); + missedHearbeat = true; } finally { synchronized (heartbeatMonitor) { nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 35c59694548..6bc2c8b2829 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -694,15 +694,11 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase { } else if (heartBeatID == 2 || heartBeatID == 3) { List statuses = request.getNodeStatus().getContainersStatuses(); - if (heartBeatID == 2) { - // NM should send completed containers again, since the last - // heartbeat is lost. - Assert.assertEquals(4, statuses.size()); - } else { - // NM should not send completed containers again, since the last - // heartbeat is successful. - Assert.assertEquals(2, statuses.size()); - } + // NM should send completed containers on heartbeat 2, + // since heartbeat 1 was lost. It will send them again on + // heartbeat 3, because it does not clear them if the previous + // heartbeat was lost in case the RM treated it as a duplicate. + Assert.assertEquals(4, statuses.size()); Assert.assertEquals(4, context.getContainers().size()); boolean container2Exist = false, container3Exist = false, @@ -733,14 +729,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase { container5Exist = true; } } - if (heartBeatID == 2) { - Assert.assertTrue(container2Exist && container3Exist - && container4Exist && container5Exist); - } else { - // NM do not send completed containers again - Assert.assertTrue(container2Exist && !container3Exist - && container4Exist && !container5Exist); - } + Assert.assertTrue(container2Exist && container3Exist + && container4Exist && container5Exist); if (heartBeatID == 3) { finishedContainersPulledByAM.add(containerStatus3.getContainerId());