YARN-4862. Handle duplicate completed containers in RMNodeImpl. Contributed by Rohith Sharma K S

(cherry picked from commit 352cbaa7a5)
This commit is contained in:
Jason Lowe 2016-11-03 14:03:56 +00:00
parent 0c7caba087
commit e2917180ee
4 changed files with 140 additions and 9 deletions

View File

@ -141,6 +141,10 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private final Set<ContainerId> launchedContainers = private final Set<ContainerId> launchedContainers =
new HashSet<ContainerId>(); new HashSet<ContainerId>();
/* track completed container globally */
private final Set<ContainerId> completedContainers =
new HashSet<ContainerId>();
/* set of containers that need to be cleaned */ /* set of containers that need to be cleaned */
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>( private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
new ContainerIdComparator()); new ContainerIdComparator());
@ -578,6 +582,7 @@ public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response
response.addContainersToBeRemovedFromNM( response.addContainersToBeRemovedFromNM(
new ArrayList<ContainerId>(this.containersToBeRemovedFromNM)); new ArrayList<ContainerId>(this.containersToBeRemovedFromNM));
response.addAllContainersToSignal(this.containersToSignal); response.addAllContainersToSignal(this.containersToSignal);
this.completedContainers.removeAll(this.containersToBeRemovedFromNM);
this.containersToClean.clear(); this.containersToClean.clear();
this.finishedApplications.clear(); this.finishedApplications.clear();
this.containersToSignal.clear(); this.containersToSignal.clear();
@ -1287,6 +1292,11 @@ public Set<ContainerId> getLaunchedContainers() {
return this.launchedContainers; return this.launchedContainers;
} }
@VisibleForTesting
public Set<ContainerId> getCompletedContainers() {
return this.completedContainers;
}
@Override @Override
public Set<String> getNodeLabels() { public Set<String> getNodeLabels() {
RMNodeLabelsManager nlm = context.getNodeLabelManager(); RMNodeLabelsManager nlm = context.getNodeLabelManager();
@ -1329,7 +1339,7 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
// containers. // containers.
List<ContainerStatus> newlyLaunchedContainers = List<ContainerStatus> newlyLaunchedContainers =
new ArrayList<ContainerStatus>(); new ArrayList<ContainerStatus>();
List<ContainerStatus> completedContainers = List<ContainerStatus> newlyCompletedContainers =
new ArrayList<ContainerStatus>(); new ArrayList<ContainerStatus>();
int numRemoteRunningContainers = 0; int numRemoteRunningContainers = 0;
for (ContainerStatus remoteContainer : containerStatuses) { for (ContainerStatus remoteContainer : containerStatuses) {
@ -1385,15 +1395,25 @@ private void handleContainerStatus(List<ContainerStatus> containerStatuses) {
} }
// Completed containers should also include the OPPORTUNISTIC containers // Completed containers should also include the OPPORTUNISTIC containers
// so that the AM gets properly notified. // so that the AM gets properly notified.
completedContainers.add(remoteContainer); if (completedContainers.add(containerId)) {
newlyCompletedContainers.add(remoteContainer);
}
} }
} }
completedContainers.addAll(findLostContainers(
numRemoteRunningContainers, containerStatuses));
if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) { List<ContainerStatus> lostContainers =
findLostContainers(numRemoteRunningContainers, containerStatuses);
for (ContainerStatus remoteContainer : lostContainers) {
ContainerId containerId = remoteContainer.getContainerId();
if (completedContainers.add(containerId)) {
newlyCompletedContainers.add(remoteContainer);
}
}
if (newlyLaunchedContainers.size() != 0
|| newlyCompletedContainers.size() != 0) {
nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers, nodeUpdateQueue.add(new UpdatedContainerInfo(newlyLaunchedContainers,
completedContainers)); newlyCompletedContainers));
} }
} }

View File

@ -73,6 +73,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
@ -909,16 +910,18 @@ protected List<ContainerStatus> updateNewContainerInfo(RMNode nm) {
* Process completed container list. * Process completed container list.
* @param completedContainers Extracted list of completed containers * @param completedContainers Extracted list of completed containers
* @param releasedResources Reference resource object for completed containers * @param releasedResources Reference resource object for completed containers
* @param nodeId NodeId corresponding to the NodeManager
* @return The total number of released containers * @return The total number of released containers
*/ */
protected int updateCompletedContainers(List<ContainerStatus> protected int updateCompletedContainers(List<ContainerStatus>
completedContainers, Resource releasedResources) { completedContainers, Resource releasedResources, NodeId nodeId) {
int releasedContainers = 0; int releasedContainers = 0;
List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
for (ContainerStatus completedContainer : completedContainers) { for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId(); ContainerId containerId = completedContainer.getContainerId();
LOG.debug("Container FINISHED: " + containerId); LOG.debug("Container FINISHED: " + containerId);
RMContainer container = getRMContainer(containerId); RMContainer container = getRMContainer(containerId);
completedContainer(getRMContainer(containerId), completedContainer(container,
completedContainer, RMContainerEventType.FINISHED); completedContainer, RMContainerEventType.FINISHED);
if (container != null) { if (container != null) {
releasedContainers++; releasedContainers++;
@ -930,8 +933,19 @@ protected int updateCompletedContainers(List<ContainerStatus>
if (rrs != null) { if (rrs != null) {
Resources.addTo(releasedResources, rrs); Resources.addTo(releasedResources, rrs);
} }
} else {
// Add containers which are untracked by RM.
untrackedContainerIdList.add(containerId);
} }
} }
// Acknowledge NM to remove RM-untracked-containers from NM context.
if (!untrackedContainerIdList.isEmpty()) {
this.rmContext.getDispatcher().getEventHandler()
.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
untrackedContainerIdList));
}
return releasedContainers; return releasedContainers;
} }
@ -977,7 +991,7 @@ protected synchronized void nodeUpdate(RMNode nm) {
// Process completed containers // Process completed containers
Resource releasedResources = Resource.newInstance(0, 0); Resource releasedResources = Resource.newInstance(0, 0);
int releasedContainers = updateCompletedContainers(completedContainers, int releasedContainers = updateCompletedContainers(completedContainers,
releasedResources); releasedResources, nm.getNodeID());
// If the node is decommissioning, send an update to have the total // If the node is decommissioning, send an update to have the total
// resource equal to the used resource, so no available resource to // resource equal to the used resource, so no available resource to
@ -1004,4 +1018,5 @@ protected synchronized void nodeUpdate(RMNode nm) {
" availableResource: " + node.getUnallocatedResource()); " availableResource: " + node.getUnallocatedResource());
} }
} }
} }

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeFinishedContainersPulledByAMEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
@ -1065,4 +1066,43 @@ public void testDisappearingContainer() {
Assert.assertTrue("second container not running", Assert.assertTrue("second container not running",
node.getLaunchedContainers().contains(cid2)); node.getLaunchedContainers().contains(cid2));
} }
@Test
public void testForHandlingDuplicatedCompltedContainers() {
// Start the node
node.handle(new RMNodeStartedEvent(null, null, null));
// Add info to the queue first
node.setNextHeartBeat(false);
ContainerId completedContainerId1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(BuilderUtils.newApplicationId(0, 0), 0), 0);
RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
doReturn(completedContainerId1).when(containerStatus1).getContainerId();
doReturn(Collections.singletonList(containerStatus1)).when(statusEvent1)
.getContainers();
verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class));
node.handle(statusEvent1);
verify(scheduler, times(1)).handle(any(NodeUpdateSchedulerEvent.class));
Assert.assertEquals(1, node.getQueueSize());
Assert.assertEquals(1, node.getCompletedContainers().size());
// test for duplicate entries
node.handle(statusEvent1);
Assert.assertEquals(1, node.getQueueSize());
// send clean up container event
node.handle(new RMNodeFinishedContainersPulledByAMEvent(node.getNodeID(),
Collections.singletonList(completedContainerId1)));
NodeHeartbeatResponse hbrsp =
Records.newRecord(NodeHeartbeatResponse.class);
node.updateNodeHeartbeatResponseForCleanup(hbrsp);
Assert.assertEquals(1, hbrsp.getContainersToBeRemovedFromNM().size());
Assert.assertEquals(0, node.getCompletedContainers().size());
}
} }

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.yarn.server.resourcemanager; package org.apache.hadoop.yarn.server.resourcemanager;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File; import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
@ -30,6 +32,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -1917,4 +1920,57 @@ public void tearDown() {
DefaultMetricsSystem.shutdown(); DefaultMetricsSystem.shutdown();
} }
} }
@Test(timeout = 60000)
public void testNodeHeartBeatResponseForUnknownContainerCleanUp()
throws Exception {
Configuration conf = new Configuration();
rm = new MockRM(conf);
rm.init(conf);
rm.start();
MockNM nm1 = rm.registerNode("host1:1234", 5120);
rm.drainEvents();
// send 1st heartbeat
nm1.nodeHeartbeat(true);
// Create 2 unknown containers tracked by NM
ApplicationId applicationId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId applicationAttemptId = BuilderUtils
.newApplicationAttemptId(applicationId, 1);
ContainerId cid1 = BuilderUtils.newContainerId(applicationAttemptId, 2);
ContainerId cid2 = BuilderUtils.newContainerId(applicationAttemptId, 3);
ArrayList<ContainerStatus> containerStats =
new ArrayList<ContainerStatus>();
containerStats.add(
ContainerStatus.newInstance(cid1, ContainerState.COMPLETE, "", -1));
containerStats.add(
ContainerStatus.newInstance(cid2, ContainerState.COMPLETE, "", -1));
Map<ApplicationId, List<ContainerStatus>> conts =
new HashMap<ApplicationId, List<ContainerStatus>>();
conts.put(applicationAttemptId.getApplicationId(), containerStats);
// add RMApp into context.
RMApp app1 = mock(RMApp.class);
when(app1.getApplicationId()).thenReturn(applicationId);
rm.getRMContext().getRMApps().put(applicationId, app1);
// Send unknown container status in heartbeat
nm1.nodeHeartbeat(conts, true);
rm.drainEvents();
int containersToBeRemovedFromNM = 0;
while (true) {
NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
rm.drainEvents();
containersToBeRemovedFromNM +=
nodeHeartbeat.getContainersToBeRemovedFromNM().size();
// asserting for 2 since two unknown containers status has been sent
if (containersToBeRemovedFromNM == 2) {
break;
}
}
}
} }