YARN-10467. ContainerIdPBImpl objects can be leaked in RMNodeImpl.completedContainers. Contributed by Haibo Chen

(cherry picked from commit bab5bf9743)
This commit is contained in:
Jonathan Hung 2020-10-28 10:32:47 -07:00
parent fe8b22ca8f
commit f95c0824b0
2 changed files with 137 additions and 6 deletions

View File

@ -179,6 +179,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private long launchAMEndTime = 0; private long launchAMEndTime = 0;
private long scheduledTime = 0; private long scheduledTime = 0;
private long containerAllocatedTime = 0; private long containerAllocatedTime = 0;
private boolean nonWorkPreservingAMContainerFinished = false;
// Set to null initially. Will eventually get set // Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs // if an RMAppAttemptUnregistrationEvent occurs
@ -853,7 +854,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// A new allocate means the AM received the previously sent // A new allocate means the AM received the previously sent
// finishedContainers. We can ack this to NM now // finishedContainers. We can ack this to NM now
sendFinishedContainersToNM(); sendFinishedContainersToNM(finishedContainersSentToAM);
// Mark every containerStatus as being sent to AM though we may return // Mark every containerStatus as being sent to AM though we may return
// only the ones that belong to the current attempt // only the ones that belong to the current attempt
@ -1980,12 +1981,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
} }
// Ack NM to remove finished containers from context. // Ack NM to remove finished containers from context.
private void sendFinishedContainersToNM() { private void sendFinishedContainersToNM(
for (NodeId nodeId : finishedContainersSentToAM.keySet()) { Map<NodeId, List<ContainerStatus>> finishedContainers) {
for (NodeId nodeId : finishedContainers.keySet()) {
// Clear and get current values // Clear and get current values
List<ContainerStatus> currentSentContainers = List<ContainerStatus> currentSentContainers =
finishedContainersSentToAM.put(nodeId, new ArrayList<>()); finishedContainers.put(nodeId, new ArrayList<>());
List<ContainerId> containerIdList = List<ContainerId> containerIdList =
new ArrayList<>(currentSentContainers.size()); new ArrayList<>(currentSentContainers.size());
for (ContainerStatus containerStatus : currentSentContainers) { for (ContainerStatus containerStatus : currentSentContainers) {
@ -1994,7 +1996,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
containerIdList)); containerIdList));
} }
this.finishedContainersSentToAM.clear(); finishedContainers.clear();
} }
// Add am container to the list so that am container instance will be // Add am container to the list so that am container instance will be
@ -2020,7 +2022,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId, appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
new ArrayList<>()); new ArrayList<>());
appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus); appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
appAttempt.sendFinishedContainersToNM(); appAttempt.sendFinishedContainersToNM(
appAttempt.finishedContainersSentToAM);
// there might be some completed containers that have not been pulled
// by the AM heartbeat, explicitly add them for cleanup.
appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
// mark the fact that AM container has finished so that future finished
// containers will be cleaned up without the engagement of AM containers
// (through heartbeat)
appAttempt.nonWorkPreservingAMContainerFinished = true;
} else { } else {
appAttempt.sendFinishedAMContainerToNM(nodeId, appAttempt.sendFinishedAMContainerToNM(nodeId,
containerStatus.getContainerId()); containerStatus.getContainerId());
@ -2048,6 +2059,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
.getNodeId(), new ArrayList<>()); .getNodeId(), new ArrayList<>());
appAttempt.justFinishedContainers.get(containerFinishedEvent appAttempt.justFinishedContainers.get(containerFinishedEvent
.getNodeId()).add(containerFinishedEvent.getContainerStatus()); .getNodeId()).add(containerFinishedEvent.getContainerStatus());
if (appAttempt.nonWorkPreservingAMContainerFinished) {
// AM container has finished, so no more AM heartbeats to do the cleanup.
appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
}
} }
private static final class ContainerFinishedAtFinalStateTransition private static final class ContainerFinishedAtFinalStateTransition

View File

@ -643,6 +643,8 @@ public class TestRMAppAttemptTransitions {
RMContainer rmContainer = mock(RMContainerImpl.class); RMContainer rmContainer = mock(RMContainerImpl.class);
when(scheduler.getRMContainer(container.getId())). when(scheduler.getRMContainer(container.getId())).
thenReturn(rmContainer); thenReturn(rmContainer);
when(container.getNodeId()).thenReturn(
BuilderUtils.newNodeId("localhost", 0));
applicationAttempt.handle( applicationAttempt.handle(
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(), new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
@ -1530,6 +1532,119 @@ public class TestRMAppAttemptTransitions {
.handle(Mockito.any(RMNodeEvent.class)); .handle(Mockito.any(RMNodeEvent.class));
} }
/**
* Check a completed container that is not yet pulled by AM heartbeat,
* is ACKed to NM for cleanup when the AM container exits.
*/
@Test
public void testFinishedContainerNotBeingPulledByAMHeartbeat() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
application.handle(new RMAppRunningOnNodeEvent(application
.getApplicationId(), amContainer.getNodeId()));
// Complete a non-AM container
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
.getAppAttemptId(), 2);
Container container1 = mock(Container.class);
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
when(container1.getId()).thenReturn(
containerId1);
when(containerStatus1.getContainerId()).thenReturn(containerId1);
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), containerStatus1,
container1.getNodeId()));
// Verify justFinishedContainers
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
.size());
Assert.assertEquals(container1.getId(), applicationAttempt
.getJustFinishedContainers().get(0).getContainerId());
Assert.assertTrue(
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
// finish AM container to emulate AM exit event
containerStatus1 = mock(ContainerStatus.class);
ContainerId amContainerId = amContainer.getId();
when(containerStatus1.getContainerId()).thenReturn(amContainerId);
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), containerStatus1,
amContainer.getNodeId()));
Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
List<RMNodeFinishedContainersPulledByAMEvent> containerPulledEvents =
captor.getAllValues();
// Verify AM container is acked to NM via the RMNodeEvent immediately
Assert.assertEquals(amContainer.getId(),
containerPulledEvents.get(0).getContainers().get(0));
// Verify the non-AM container is acked to NM via the RMNodeEvent
Assert.assertEquals(container1.getId(),
containerPulledEvents.get(1).getContainers().get(0));
Assert.assertTrue("No container shall be added to justFinishedContainers" +
" as soon as AM container exits",
applicationAttempt.getJustFinishedContainers().isEmpty());
Assert.assertTrue(
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
}
/**
* Check a completed container is ACKed to NM for cleanup after the AM
* container has exited.
*/
@Test
public void testFinishedContainerAfterAMExit() {
Container amContainer = allocateApplicationAttempt();
launchApplicationAttempt(amContainer);
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
// finish AM container to emulate AM exit event
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
ContainerId amContainerId = amContainer.getId();
when(containerStatus1.getContainerId()).thenReturn(amContainerId);
application.handle(new RMAppRunningOnNodeEvent(application
.getApplicationId(),
amContainer.getNodeId()));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), containerStatus1,
amContainer.getNodeId()));
// Verify AM container is acked to NM via the RMNodeEvent immediately
ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
Mockito.verify(rmnodeEventHandler).handle(captor.capture());
Assert.assertEquals(amContainer.getId(),
captor.getValue().getContainers().get(0));
// Complete a non-AM container
ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
.getAppAttemptId(), 2);
Container container1 = mock(Container.class);
containerStatus1 = mock(ContainerStatus.class);
when(container1.getId()).thenReturn(containerId1);
when(containerStatus1.getContainerId()).thenReturn(containerId1);
when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
applicationAttempt.getAppAttemptId(), containerStatus1,
container1.getNodeId()));
// Verify container is acked to NM via the RMNodeEvent immediately
captor = ArgumentCaptor.forClass(
RMNodeFinishedContainersPulledByAMEvent.class);
Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
Assert.assertEquals(container1.getId(),
captor.getAllValues().get(1).getContainers().get(0));
Assert.assertTrue("No container shall be added to justFinishedContainers" +
" after AM container exited",
applicationAttempt.getJustFinishedContainers().isEmpty());
Assert.assertTrue(
getFinishedContainersSentToAM(applicationAttempt).isEmpty());
}
private static List<ContainerStatus> getFinishedContainersSentToAM( private static List<ContainerStatus> getFinishedContainersSentToAM(
RMAppAttempt applicationAttempt) { RMAppAttempt applicationAttempt) {
List<ContainerStatus> containers = new ArrayList<ContainerStatus>(); List<ContainerStatus> containers = new ArrayList<ContainerStatus>();