YARN-4771. Some containers can be skipped during log aggregation after NM

restart. Contributed by Jason Lowe and Jim Brennan.
This commit is contained in:
Eric Badger 2020-07-24 22:35:16 +00:00
parent e60096c377
commit ac5f21dbef
2 changed files with 18 additions and 17 deletions

View File

@ -777,8 +777,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
while (i.hasNext()) {
Entry<ContainerId, Long> mapEntry = i.next();
ContainerId cid = mapEntry.getKey();
if (mapEntry.getValue() < currentTime) {
if (!context.getContainers().containsKey(cid)) {
if (mapEntry.getValue() >= currentTime) {
break;
}
if (!context.getContainers().containsKey(cid)) {
ApplicationId appId =
cid.getApplicationAttemptId().getApplicationId();
if (isApplicationStopped(appId)) {
i.remove();
try {
context.getNMStateStore().removeContainer(cid);
@ -786,8 +791,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
LOG.error("Unable to remove container " + cid + " in store", e);
}
}
} else {
break;
}
}
}

View File

@ -931,9 +931,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
public void testRecentlyFinishedContainers() throws Exception {
NodeManager nm = new NodeManager();
YarnConfiguration conf = new YarnConfiguration();
conf.set(
NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
"10000");
conf.setInt(NodeStatusUpdaterImpl.
YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, 1);
nm.init(conf);
NodeStatusUpdaterImpl nodeStatusUpdater =
(NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
@ -948,18 +947,17 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
nodeStatusUpdater.addCompletedContainer(cId);
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
// verify container remains even after expiration if app
// is still active
nm.getNMContext().getContainers().remove(cId);
long time1 = System.currentTimeMillis();
int waitInterval = 15;
while (waitInterval-- > 0
&& nodeStatusUpdater.isContainerRecentlyStopped(cId)) {
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
Thread.sleep(1000);
}
long time2 = System.currentTimeMillis();
// By this time the container will be removed from cache. need to verify.
Thread.sleep(10);
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
// complete the application and verify container is removed
nm.getNMContext().getApplications().remove(appId);
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000);
}
@Test(timeout = 90000)