YARN-4771. Some containers can be skipped during log aggregation after NM
restart. Contributed by Jason Lowe and Jim Brennan.
(cherry picked from commit ac5f21dbef
)
This commit is contained in:
parent
1ae72d2438
commit
7ec692aa83
|
@ -776,8 +776,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
while (i.hasNext()) {
|
while (i.hasNext()) {
|
||||||
Entry<ContainerId, Long> mapEntry = i.next();
|
Entry<ContainerId, Long> mapEntry = i.next();
|
||||||
ContainerId cid = mapEntry.getKey();
|
ContainerId cid = mapEntry.getKey();
|
||||||
if (mapEntry.getValue() < currentTime) {
|
if (mapEntry.getValue() >= currentTime) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
if (!context.getContainers().containsKey(cid)) {
|
if (!context.getContainers().containsKey(cid)) {
|
||||||
|
ApplicationId appId =
|
||||||
|
cid.getApplicationAttemptId().getApplicationId();
|
||||||
|
if (isApplicationStopped(appId)) {
|
||||||
i.remove();
|
i.remove();
|
||||||
try {
|
try {
|
||||||
context.getNMStateStore().removeContainer(cid);
|
context.getNMStateStore().removeContainer(cid);
|
||||||
|
@ -785,8 +790,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
|
||||||
LOG.error("Unable to remove container " + cid + " in store", e);
|
LOG.error("Unable to remove container " + cid + " in store", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -931,9 +931,8 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
||||||
public void testRecentlyFinishedContainers() throws Exception {
|
public void testRecentlyFinishedContainers() throws Exception {
|
||||||
NodeManager nm = new NodeManager();
|
NodeManager nm = new NodeManager();
|
||||||
YarnConfiguration conf = new YarnConfiguration();
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
conf.set(
|
conf.setInt(NodeStatusUpdaterImpl.
|
||||||
NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
|
YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, 1);
|
||||||
"10000");
|
|
||||||
nm.init(conf);
|
nm.init(conf);
|
||||||
NodeStatusUpdaterImpl nodeStatusUpdater =
|
NodeStatusUpdaterImpl nodeStatusUpdater =
|
||||||
(NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
|
(NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
|
||||||
|
@ -948,18 +947,17 @@ public class TestNodeStatusUpdater extends NodeManagerTestBase {
|
||||||
nodeStatusUpdater.addCompletedContainer(cId);
|
nodeStatusUpdater.addCompletedContainer(cId);
|
||||||
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
|
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
|
||||||
|
|
||||||
|
// verify container remains even after expiration if app
|
||||||
|
// is still active
|
||||||
nm.getNMContext().getContainers().remove(cId);
|
nm.getNMContext().getContainers().remove(cId);
|
||||||
long time1 = System.currentTimeMillis();
|
Thread.sleep(10);
|
||||||
int waitInterval = 15;
|
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
|
||||||
while (waitInterval-- > 0
|
Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
|
||||||
&& nodeStatusUpdater.isContainerRecentlyStopped(cId)) {
|
|
||||||
|
// complete the application and verify container is removed
|
||||||
|
nm.getNMContext().getApplications().remove(appId);
|
||||||
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
|
nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
long time2 = System.currentTimeMillis();
|
|
||||||
// By this time the container will be removed from cache. need to verify.
|
|
||||||
Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
|
Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
|
||||||
Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 90000)
|
@Test(timeout = 90000)
|
||||||
|
|
Loading…
Reference in New Issue