YARN-10393. MR job live lock caused by completed state container leak in heartbeat between node manager and RM. Contributed by zhenzhao wang and Jim Brennan

(cherry picked from commit a1f7e760df)
This commit is contained in:
Adam Antal 2020-10-05 10:09:14 +02:00
parent d80dfad900
commit 3ae78e40bf
2 changed files with 25 additions and 19 deletions

View File

@ -703,7 +703,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@VisibleForTesting @VisibleForTesting
@Private @Private
public void removeOrTrackCompletedContainersFromContext( public void removeOrTrackCompletedContainersFromContext(
List<ContainerId> containerIds) throws IOException { List<ContainerId> containerIds) {
Set<ContainerId> removedContainers = new HashSet<ContainerId>(); Set<ContainerId> removedContainers = new HashSet<ContainerId>();
pendingContainersToRemove.addAll(containerIds); pendingContainersToRemove.addAll(containerIds);
@ -720,13 +720,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
removedContainers.add(containerId); removedContainers.add(containerId);
iter.remove(); iter.remove();
} }
pendingCompletedContainers.remove(containerId);
} }
if (!removedContainers.isEmpty()) { if (!removedContainers.isEmpty()) {
LOG.info("Removed completed containers from NM context: " LOG.info("Removed completed containers from NM context: "
+ removedContainers); + removedContainers);
} }
pendingCompletedContainers.clear();
} }
private void trackAppsForKeepAlive(List<ApplicationId> appIds) { private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@ -1301,6 +1301,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void run() { public void run() {
int lastHeartbeatID = 0; int lastHeartbeatID = 0;
boolean missedHearbeat = false;
while (!isStopped) { while (!isStopped) {
// Send heartbeat // Send heartbeat
try { try {
@ -1354,6 +1355,20 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
removeOrTrackCompletedContainersFromContext(response removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM()); .getContainersToBeRemovedFromNM());
// If the last heartbeat was missed, it is possible that the
// RM saw this one as a duplicate and did not process it.
// If so, we can fail to notify the RM of these completed containers
// on the next heartbeat if we clear pendingCompletedContainers.
// If it wasn't a duplicate, the only impact is we might notify
// the RM twice, which it can handle.
if (!missedHearbeat) {
pendingCompletedContainers.clear();
} else {
LOG.info("skipped clearing pending completed containers due to " +
"missed heartbeat");
missedHearbeat = false;
}
logAggregationReportForAppsTempList.clear(); logAggregationReportForAppsTempList.clear();
lastHeartbeatID = response.getResponseId(); lastHeartbeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response List<ContainerId> containersToCleanup = response
@ -1433,6 +1448,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// TODO Better error handling. Thread can die with the rest of the // TODO Better error handling. Thread can die with the rest of the
// NM still running. // NM still running.
LOG.error("Caught exception in status-updater", e); LOG.error("Caught exception in status-updater", e);
missedHearbeat = true;
} finally { } finally {
synchronized (heartbeatMonitor) { synchronized (heartbeatMonitor) {
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?

View File

@ -721,15 +721,11 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
} else if (heartBeatID == 2 || heartBeatID == 3) { } else if (heartBeatID == 2 || heartBeatID == 3) {
List<ContainerStatus> statuses = List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses(); request.getNodeStatus().getContainersStatuses();
if (heartBeatID == 2) { // NM should send completed containers on heartbeat 2,
// NM should send completed containers again, since the last // since heartbeat 1 was lost. It will send them again on
// heartbeat is lost. // heartbeat 3, because it does not clear them if the previous
// heartbeat was lost in case the RM treated it as a duplicate.
Assert.assertEquals(4, statuses.size()); Assert.assertEquals(4, statuses.size());
} else {
// NM should not send completed containers again, since the last
// heartbeat is successful.
Assert.assertEquals(2, statuses.size());
}
Assert.assertEquals(4, context.getContainers().size()); Assert.assertEquals(4, context.getContainers().size());
boolean container2Exist = false, container3Exist = false, boolean container2Exist = false, container3Exist = false,
@ -760,14 +756,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
container5Exist = true; container5Exist = true;
} }
} }
if (heartBeatID == 2) {
Assert.assertTrue(container2Exist && container3Exist Assert.assertTrue(container2Exist && container3Exist
&& container4Exist && container5Exist); && container4Exist && container5Exist);
} else {
// NM do not send completed containers again
Assert.assertTrue(container2Exist && !container3Exist
&& container4Exist && !container5Exist);
}
if (heartBeatID == 3) { if (heartBeatID == 3) {
finishedContainersPulledByAM.add(containerStatus3.getContainerId()); finishedContainersPulledByAM.add(containerStatus3.getContainerId());