YARN-4771. Some containers can be skipped during log aggregation after NM

restart. Contributed by Jason Lowe and Jim Brennan.

(cherry picked from commit ac5f21dbef)
This commit is contained in:
Eric Badger 2020-07-24 22:55:08 +00:00
parent 0fb7c48acb
commit 7350773b69
2 changed files with 18 additions and 17 deletions

View File

@ -776,8 +776,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
while (i.hasNext()) { while (i.hasNext()) {
Entry<ContainerId, Long> mapEntry = i.next(); Entry<ContainerId, Long> mapEntry = i.next();
ContainerId cid = mapEntry.getKey(); ContainerId cid = mapEntry.getKey();
if (mapEntry.getValue() < currentTime) { if (mapEntry.getValue() >= currentTime) {
break;
}
if (!context.getContainers().containsKey(cid)) { if (!context.getContainers().containsKey(cid)) {
ApplicationId appId =
cid.getApplicationAttemptId().getApplicationId();
if (isApplicationStopped(appId)) {
i.remove(); i.remove();
try { try {
context.getNMStateStore().removeContainer(cid); context.getNMStateStore().removeContainer(cid);
@ -785,8 +790,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
LOG.error("Unable to remove container " + cid + " in store", e); LOG.error("Unable to remove container " + cid + " in store", e);
} }
} }
} else {
break;
} }
} }
} }

View File

@ -903,9 +903,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
public void testRecentlyFinishedContainers() throws Exception { public void testRecentlyFinishedContainers() throws Exception {
NodeManager nm = new NodeManager(); NodeManager nm = new NodeManager();
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
conf.set( conf.setInt(NodeStatusUpdaterImpl.
NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, 1);
"10000");
nm.init(conf); nm.init(conf);
NodeStatusUpdaterImpl nodeStatusUpdater = NodeStatusUpdaterImpl nodeStatusUpdater =
(NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
@ -920,18 +919,17 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
nodeStatusUpdater.addCompletedContainer(cId); nodeStatusUpdater.addCompletedContainer(cId);
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId)); Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
// verify container remains even after expiration if app
// is still active
nm.getNMContext().getContainers().remove(cId); nm.getNMContext().getContainers().remove(cId);
long time1 = System.currentTimeMillis(); Thread.sleep(10);
int waitInterval = 15; nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
while (waitInterval-- > 0 Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
&& nodeStatusUpdater.isContainerRecentlyStopped(cId)) {
// complete the application and verify container is removed
nm.getNMContext().getApplications().remove(appId);
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache(); nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
Thread.sleep(1000);
}
long time2 = System.currentTimeMillis();
// By this time the container will be removed from cache. need to verify.
Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId)); Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000);
} }
@Test(timeout = 90000) @Test(timeout = 90000)