YARN-10393. MR job live lock caused by completed state container leak in heartbeat between node manager and RM. Contributed by zhenzhao wang and Jim Brennan

This commit is contained in:
Adam Antal 2020-10-07 16:46:22 +02:00
parent 0bf270d2ed
commit 1c0fe2eb20
2 changed files with 25 additions and 19 deletions

View File

@ -659,7 +659,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@VisibleForTesting @VisibleForTesting
@Private @Private
public void removeOrTrackCompletedContainersFromContext( public void removeOrTrackCompletedContainersFromContext(
List<ContainerId> containerIds) throws IOException { List<ContainerId> containerIds) {
Set<ContainerId> removedContainers = new HashSet<ContainerId>(); Set<ContainerId> removedContainers = new HashSet<ContainerId>();
pendingContainersToRemove.addAll(containerIds); pendingContainersToRemove.addAll(containerIds);
@ -676,13 +676,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
removedContainers.add(containerId); removedContainers.add(containerId);
iter.remove(); iter.remove();
} }
pendingCompletedContainers.remove(containerId);
} }
if (!removedContainers.isEmpty()) { if (!removedContainers.isEmpty()) {
LOG.info("Removed completed containers from NM context: " LOG.info("Removed completed containers from NM context: "
+ removedContainers); + removedContainers);
} }
pendingCompletedContainers.clear();
} }
private void trackAppsForKeepAlive(List<ApplicationId> appIds) { private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@ -790,6 +790,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public void run() { public void run() {
int lastHeartbeatID = 0; int lastHeartbeatID = 0;
boolean missedHearbeat = false;
while (!isStopped) { while (!isStopped) {
// Send heartbeat // Send heartbeat
try { try {
@ -836,6 +837,20 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
removeOrTrackCompletedContainersFromContext(response removeOrTrackCompletedContainersFromContext(response
.getContainersToBeRemovedFromNM()); .getContainersToBeRemovedFromNM());
// If the last heartbeat was missed, it is possible that the
// RM saw this one as a duplicate and did not process it.
// If so, we can fail to notify the RM of these completed containers
// on the next heartbeat if we clear pendingCompletedContainers.
// If it wasn't a duplicate, the only impact is we might notify
// the RM twice, which it can handle.
if (!missedHearbeat) {
pendingCompletedContainers.clear();
} else {
LOG.info("skipped clearing pending completed containers due to " +
"missed heartbeat");
missedHearbeat = false;
}
logAggregationReportForAppsTempList.clear(); logAggregationReportForAppsTempList.clear();
lastHeartbeatID = response.getResponseId(); lastHeartbeatID = response.getResponseId();
List<ContainerId> containersToCleanup = response List<ContainerId> containersToCleanup = response
@ -911,6 +926,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
// TODO Better error handling. Thread can die with the rest of the // TODO Better error handling. Thread can die with the rest of the
// NM still running. // NM still running.
LOG.error("Caught exception in status-updater", e); LOG.error("Caught exception in status-updater", e);
missedHearbeat = true;
} finally { } finally {
synchronized (heartbeatMonitor) { synchronized (heartbeatMonitor) {
nextHeartBeatInterval = nextHeartBeatInterval <= 0 ? nextHeartBeatInterval = nextHeartBeatInterval <= 0 ?

View File

@ -692,15 +692,11 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
} else if (heartBeatID == 2 || heartBeatID == 3) { } else if (heartBeatID == 2 || heartBeatID == 3) {
List<ContainerStatus> statuses = List<ContainerStatus> statuses =
request.getNodeStatus().getContainersStatuses(); request.getNodeStatus().getContainersStatuses();
if (heartBeatID == 2) { // NM should send completed containers on heartbeat 2,
// NM should send completed containers again, since the last // since heartbeat 1 was lost. It will send them again on
// heartbeat is lost. // heartbeat 3, because it does not clear them if the previous
Assert.assertEquals(4, statuses.size()); // heartbeat was lost in case the RM treated it as a duplicate.
} else { Assert.assertEquals(4, statuses.size());
// NM should not send completed containers again, since the last
// heartbeat is successful.
Assert.assertEquals(2, statuses.size());
}
Assert.assertEquals(4, context.getContainers().size()); Assert.assertEquals(4, context.getContainers().size());
boolean container2Exist = false, container3Exist = false, boolean container2Exist = false, container3Exist = false,
@ -731,14 +727,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
container5Exist = true; container5Exist = true;
} }
} }
if (heartBeatID == 2) { Assert.assertTrue(container2Exist && container3Exist
Assert.assertTrue(container2Exist && container3Exist && container4Exist && container5Exist);
&& container4Exist && container5Exist);
} else {
// NM do not send completed containers again
Assert.assertTrue(container2Exist && !container3Exist
&& container4Exist && !container5Exist);
}
if (heartBeatID == 3) { if (heartBeatID == 3) {
finishedContainersPulledByAM.add(containerStatus3.getContainerId()); finishedContainersPulledByAM.add(containerStatus3.getContainerId());