diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index c668198f47a..9b0198abab3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -141,6 +141,10 @@ public class RMNodeImpl implements RMNode, EventHandler { private final Set launchedContainers = new HashSet(); + /* track completed container globally */ + private final Set completedContainers = + new HashSet(); + /* set of containers that need to be cleaned */ private final Set containersToClean = new TreeSet( new ContainerIdComparator()); @@ -578,6 +582,7 @@ public class RMNodeImpl implements RMNode, EventHandler { response.addContainersToBeRemovedFromNM( new ArrayList(this.containersToBeRemovedFromNM)); response.addAllContainersToSignal(this.containersToSignal); + this.completedContainers.removeAll(this.containersToBeRemovedFromNM); this.containersToClean.clear(); this.finishedApplications.clear(); this.containersToSignal.clear(); @@ -1287,6 +1292,11 @@ public class RMNodeImpl implements RMNode, EventHandler { return this.launchedContainers; } + @VisibleForTesting + public Set getCompletedContainers() { + return this.completedContainers; + } + @Override public Set getNodeLabels() { RMNodeLabelsManager nlm = context.getNodeLabelManager(); @@ -1329,7 +1339,7 @@ public class RMNodeImpl implements RMNode, EventHandler { // containers. List newlyLaunchedContainers = new ArrayList(); - List completedContainers = + List newlyCompletedContainers = new ArrayList(); int numRemoteRunningContainers = 0; for (ContainerStatus remoteContainer : containerStatuses) { @@ -1385,15 +1395,25 @@ public class RMNodeImpl implements RMNode, EventHandler { } // Completed containers should also include the OPPORTUNISTIC containers // so that the AM gets properly notified. - completedContainers.add(remoteContainer); + if (completedContainers.add(containerId)) { + newlyCompletedContainers.add(remoteContainer); + } } } - completedContainers.addAll(findLostContainers( - numRemoteRunningContainers, containerStatuses)); - if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { + List lostContainers = + findLostContainers(numRemoteRunningContainers, containerStatuses); + for (ContainerStatus remoteContainer : lostContainers) { + ContainerId containerId = remoteContainer.getContainerId(); + if (completedContainers.add(containerId)) { + newlyCompletedContainers.add(remoteContainer); + } + } + + if (newlyLaunchedContainers.size() != 0 + || newlyCompletedContainers.size() != 0) { nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers, - completedContainers)); + newlyCompletedContainers)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index df595561745..5e6a7265ca6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -909,16 +910,18 @@ public abstract class AbstractYarnScheduler * Process completed container list. * @param completedContainers Extracted list of completed containers * @param releasedResources Reference resource object for completed containers + * @param nodeId NodeId corresponding to the NodeManager * @return The total number of released containers */ protected int updateCompletedContainers(List - completedContainers, Resource releasedResources) { + completedContainers, Resource releasedResources, NodeId nodeId) { int releasedContainers = 0; + List untrackedContainerIdList = new ArrayList(); for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); LOG.debug("Container FINISHED: " + containerId); RMContainer container = getRMContainer(containerId); - completedContainer(getRMContainer(containerId), + completedContainer(container, completedContainer, RMContainerEventType.FINISHED); if (container != null) { releasedContainers++; @@ -930,8 +933,19 @@ public abstract class AbstractYarnScheduler if (rrs != null) { Resources.addTo(releasedResources, rrs); } + } else { + // Add containers which are untracked by RM. + untrackedContainerIdList.add(containerId); } } + + // Acknowledge NM to remove RM-untracked-containers from NM context. + if (!untrackedContainerIdList.isEmpty()) { + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId, + untrackedContainerIdList)); + } + return releasedContainers; } @@ -977,7 +991,7 @@ public abstract class AbstractYarnScheduler // Process completed containers Resource releasedResources = Resource.newInstance(0, 0); int releasedContainers = updateCompletedContainers(completedContainers, - releasedResources); + releasedResources, nm.getNodeID()); // If the node is decommissioning, send an update to have the total // resource equal to the used resource, so no available resource to @@ -1004,4 +1018,5 @@ public abstract class AbstractYarnScheduler " availableResource: " + node.getUnallocatedResource()); } } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java index 6038b317fd8..6055afb81d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; @@ -1065,4 +1066,43 @@ public class TestRMNodeTransitions { Assert.assertTrue("second container not running", node.getLaunchedContainers().contains(cid2)); } + + @Test + public void testForHandlingDuplicatedCompltedContainers() { + // Start the node + node.handle(new RMNodeStartedEvent(null, null, null)); + // Add info to the queue first + node.setNextHeartBeat(false); + + ContainerId completedContainerId1 = BuilderUtils.newContainerId( + BuilderUtils.newApplicationAttemptId(BuilderUtils.newApplicationId(0, 0), 0), 0); + + RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null); + + ContainerStatus containerStatus1 = mock(ContainerStatus.class); + + doReturn(completedContainerId1).when(containerStatus1).getContainerId(); + doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1) + .getContainers(); + + verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class)); + node.handle(statusEvent1); + verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class)); + Assert.assertEquals(1, node.getQueueSize()); + Assert.assertEquals(1, node.getCompletedContainers().size()); + + // test for duplicate entries + node.handle(statusEvent1); + Assert.assertEquals(1, node.getQueueSize()); + + // send clean up container event + node.handle(new RMNodeFinishedContainersPulledByAMEvent(node.getNodeID(), + Collections.singletonList(completedContainerId1))); + + NodeHeartbeatResponse hbrsp = + Records.newRecord(NodeHeartbeatResponse.class); + node.updateNodeHeartbeatResponseForCleanup(hbrsp); + Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size()); + Assert.assertEquals(0, node.getCompletedContainers().size()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java index 298e673cbd6..5d579174180 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceTrackerService.java @@ -19,9 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager; import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import java.io.File; import java.io.FileOutputStream; @@ -30,6 +32,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -1917,4 +1920,57 @@ public class TestResourceTrackerService extends NodeLabelTestBase { DefaultMetricsSystem.shutdown(); } } + + @Test(timeout = 60000) + public void testNodeHeartBeatResponseForUnknownContainerCleanUp() + throws Exception { + Configuration conf = new Configuration(); + rm = new MockRM(conf); + rm.init(conf); + rm.start(); + + MockNM nm1 = rm.registerNode("host1:1234", 5120); + rm.drainEvents(); + + // send 1st heartbeat + nm1.nodeHeartbeat(true); + + // Create 2 unknown containers tracked by NM + ApplicationId applicationId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId applicationAttemptId = BuilderUtils + .newApplicationAttemptId(applicationId, 1); + ContainerId cid1 = BuilderUtils.newContainerId(applicationAttemptId, 2); + ContainerId cid2 = BuilderUtils.newContainerId(applicationAttemptId, 3); + ArrayList containerStats = + new ArrayList(); + containerStats.add( + ContainerStatus.newInstance(cid1, ContainerState.COMPLETE, "", -1)); + containerStats.add( + ContainerStatus.newInstance(cid2, ContainerState.COMPLETE, "", -1)); + + Map> conts = + new HashMap>(); + conts.put(applicationAttemptId.getApplicationId(), containerStats); + + // add RMApp into context. + RMApp app1 = mock(RMApp.class); + when(app1.getApplicationId()).thenReturn(applicationId); + rm.getRMContext().getRMApps().put(applicationId, app1); + + // Send unknown container status in heartbeat + nm1.nodeHeartbeat(conts, true); + rm.drainEvents(); + + int containersToBeRemovedFromNM = 0; + while (true) { + NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true); + rm.drainEvents(); + containersToBeRemovedFromNM += + nodeHeartbeat.getContainersToBeRemovedFromNM().size(); + // asserting for 2 since two unknown containers status has been sent + if (containersToBeRemovedFromNM == 2) { + break; + } + } + } }