YARN-10467. ContainerIdPBImpl objects can be leaked in RMNodeImpl.completedContainers. Contributed by Haibo Chen

(cherry picked from commit bab5bf9743)
(cherry picked from commit f95c0824b0)
(cherry picked from commit d0104e72c5)
This commit is contained in:
Jonathan Hung 2020-10-28 10:32:47 -07:00
parent 34915b0373
commit 42fab7897a
2 changed files with 137 additions and 6 deletions

View File

@ -178,6 +178,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private long launchAMEndTime = 0;
private long scheduledTime = 0;
private long containerAllocatedTime = 0;
private boolean nonWorkPreservingAMContainerFinished = false;
// Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs
@ -854,7 +855,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// A new allocate means the AM received the previously sent
// finishedContainers. We can ack this to NM now
sendFinishedContainersToNM();
sendFinishedContainersToNM(finishedContainersSentToAM);
// Mark every containerStatus as being sent to AM though we may return
// only the ones that belong to the current attempt
@ -1986,12 +1987,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
// Ack NM to remove finished containers from context.
private void sendFinishedContainersToNM() {
for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
private void sendFinishedContainersToNM(
Map<NodeId, List<ContainerStatus>> finishedContainers) {
for (NodeId nodeId : finishedContainers.keySet()) {
// Clear and get current values
List<ContainerStatus> currentSentContainers =
finishedContainersSentToAM.put(nodeId, new ArrayList<>());
finishedContainers.put(nodeId, new ArrayList<>());
List<ContainerId> containerIdList =
new ArrayList<>(currentSentContainers.size());
for (ContainerStatus containerStatus : currentSentContainers) {
@ -2000,7 +2002,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
containerIdList));
}
this.finishedContainersSentToAM.clear();
finishedContainers.clear();
}
// Add am container to the list so that am container instance will be
@ -2026,7 +2028,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
new ArrayList<>());
appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
appAttempt.sendFinishedContainersToNM();
appAttempt.sendFinishedContainersToNM(
appAttempt.finishedContainersSentToAM);
// there might be some completed containers that have not been pulled
// by the AM heartbeat, explicitly add them for cleanup.
appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
// mark the fact that AM container has finished so that future finished
// containers will be cleaned up without the engagement of AM containers
// (through heartbeat)
appAttempt.nonWorkPreservingAMContainerFinished = true;
} else {
appAttempt.sendFinishedAMContainerToNM(nodeId,
containerStatus.getContainerId());
@ -2054,6 +2065,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
.getNodeId(), new ArrayList<>());
appAttempt.justFinishedContainers.get(containerFinishedEvent
.getNodeId()).add(containerFinishedEvent.getContainerStatus());
if (appAttempt.nonWorkPreservingAMContainerFinished) {
// AM container has finished, so no more AM heartbeats to do the cleanup.
appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
}
}
private static final class ContainerFinishedAtFinalStateTransition

View File

@ -649,6 +649,8 @@ public class TestRMAppAttemptTransitions {
RMContainer rmContainer = mock(RMContainerImpl.class);
when(scheduler.getRMContainer(container.getId())).
thenReturn(rmContainer);
when(container.getNodeId()).thenReturn(
BuilderUtils.newNodeId("localhost", 0));
applicationAttempt.handle(
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
@ -1536,6 +1538,119 @@ public class TestRMAppAttemptTransitions {
.handle(Mockito.any(RMNodeEvent.class));
}
/**
* Check a completed container that is not yet pulled by AM heartbeat,
* is ACKed to NM for cleanup when the AM container exits.
*/
@Test
public void testFinishedContainerNotBeingPulledByAMHeartbeat() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
application.handle(new RMAppRunningOnNodeEvent(application
.getApplicationId(), amContainer.getNodeId()));
// Complete a non-AM container
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
.getAppAttemptId(), 2);
Container container1 = mock(Container.class);
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
when(container1.getId()).thenReturn(
containerId1);
when(containerStatus1.getContainerId()).thenReturn(containerId1);
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), containerStatus1,
container1.getNodeId()));
// Verify justFinishedContainers
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
.size());
Assert.assertEquals(container1.getId(), applicationAttempt
.getJustFinishedContainers().get(0).getContainerId());
Assert.assertTrue(
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
// finish AM container to emulate AM exit event
containerStatus1 = mock(ContainerStatus.class);
ContainerId amContainerId = amContainer.getId();
when(containerStatus1.getContainerId()).thenReturn(amContainerId);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), containerStatus1,
amContainer.getNodeId()));
Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
List<RMNodeFinishedContainersPulledByAMEvent> containerPulledEvents =
captor.getAllValues();
// Verify AM container is acked to NM via the RMNodeEvent immediately
Assert.assertEquals(amContainer.getId(),
containerPulledEvents.get(0).getContainers().get(0));
// Verify the non-AM container is acked to NM via the RMNodeEvent
Assert.assertEquals(container1.getId(),
containerPulledEvents.get(1).getContainers().get(0));
Assert.assertTrue("No container shall be added to justFinishedContainers" +
" as soon as AM container exits",
applicationAttempt.getJustFinishedContainers().isEmpty());
Assert.assertTrue(
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
}
/**
* Check a completed container is ACKed to NM for cleanup after the AM
* container has exited.
*/
@Test
public void testFinishedContainerAfterAMExit() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
// finish AM container to emulate AM exit event
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
ContainerId amContainerId = amContainer.getId();
when(containerStatus1.getContainerId()).thenReturn(amContainerId);
application.handle(new RMAppRunningOnNodeEvent(application
.getApplicationId(),
amContainer.getNodeId()));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), containerStatus1,
amContainer.getNodeId()));
// Verify AM container is acked to NM via the RMNodeEvent immediately
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
Mockito.verify(rmnodeEventHandler).handle(captor.capture());
Assert.assertEquals(amContainer.getId(),
captor.getValue().getContainers().get(0));
// Complete a non-AM container
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
.getAppAttemptId(), 2);
Container container1 = mock(Container.class);
containerStatus1 = mock(ContainerStatus.class);
when(container1.getId()).thenReturn(containerId1);
when(containerStatus1.getContainerId()).thenReturn(containerId1);
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), containerStatus1,
container1.getNodeId()));
// Verify container is acked to NM via the RMNodeEvent immediately
captor = ArgumentCaptor.forClass(
RMNodeFinishedContainersPulledByAMEvent.class);
Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
Assert.assertEquals(container1.getId(),
captor.getAllValues().get(1).getContainers().get(0));
Assert.assertTrue("No container shall be added to justFinishedContainers" +
" after AM container exited",
applicationAttempt.getJustFinishedContainers().isEmpty());
Assert.assertTrue(
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
}
private static List<ContainerStatus> getFinishedContainersSentToAM(
RMAppAttempt applicationAttempt) {
List<ContainerStatus> containers = new ArrayList<ContainerStatus>();