diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index a72f0d1a084..7064fa3c2bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -47,6 +47,7 @@ import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; @@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.impl.pb.RecordFactoryPBImpl; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.security.ContainerTokenIdentifier; import org.apache.hadoop.yarn.server.api.ResourceManagerConstants; import org.apache.hadoop.yarn.server.api.ResourceTracker; import org.apache.hadoop.yarn.server.api.ServerRMProxy; @@ -86,6 +88,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics; import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.YarnVersionInfo; import com.google.common.annotations.VisibleForTesting; @@ -544,14 +547,56 @@ public class NodeStatusUpdaterImpl extends AbstractService implements containerStatuses.add(containerStatus); } } + + // Account for all containers that got killed while they were still queued. + pendingCompletedContainers.putAll(getKilledQueuedContainerStatuses()); + containerStatuses.addAll(pendingCompletedContainers.values()); + if (LOG.isDebugEnabled()) { LOG.debug("Sending out " + containerStatuses.size() + " container statuses: " + containerStatuses); } return containerStatuses; } - + + /** + * Add to the container statuses the status of the containers that got killed + * while they were queued. + */ + private Map getKilledQueuedContainerStatuses() { + Map killedQueuedContainerStatuses = + new HashMap<>(); + for (Map.Entry killedQueuedContainer : + this.context.getQueuingContext(). + getKilledQueuedContainers().entrySet()) { + ContainerTokenIdentifier containerTokenId = killedQueuedContainer + .getKey(); + ContainerId containerId = containerTokenId.getContainerID(); + ContainerStatus containerStatus = BuilderUtils.newContainerStatus( + containerId, ContainerState.COMPLETE, + killedQueuedContainer.getValue(), ContainerExitStatus.ABORTED, + containerTokenId.getResource(), containerTokenId.getExecutionType()); + ApplicationId applicationId = containerId.getApplicationAttemptId() + .getApplicationId(); + if (isApplicationStopped(applicationId)) { + if (LOG.isDebugEnabled()) { + LOG.debug(applicationId + " is completing, " + " remove " + + containerId + " from NM context."); + } + this.context.getQueuingContext().getKilledQueuedContainers() + .remove(containerTokenId); + killedQueuedContainerStatuses.put(containerId, containerStatus); + } else { + if (!isContainerRecentlyStopped(containerId)) { + killedQueuedContainerStatuses.put(containerId, containerStatus); + } + } + addCompletedContainer(containerId); + } + return killedQueuedContainerStatuses; + } + private List getRunningApplications() { List runningApplications = new ArrayList(); runningApplications.addAll(this.context.getApplications().keySet()); @@ -617,6 +662,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements public void removeOrTrackCompletedContainersFromContext( List containerIds) throws IOException { Set removedContainers = new HashSet(); + Set removedNullContainers = new HashSet(); pendingContainersToRemove.addAll(containerIds); Iterator iter = pendingContainersToRemove.iterator(); @@ -626,6 +672,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements Container nmContainer = context.getContainers().get(containerId); if (nmContainer == null) { iter.remove(); + removedNullContainers.add(containerId); } else if (nmContainer.getContainerState().equals( org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE)) { context.getContainers().remove(containerId); @@ -634,6 +681,17 @@ public class NodeStatusUpdaterImpl extends AbstractService implements } } + // Remove null containers from queuing context for killed queued containers. + Iterator killedQueuedContIter = + context.getQueuingContext().getKilledQueuedContainers().keySet(). + iterator(); + while (killedQueuedContIter.hasNext()) { + if (removedNullContainers.contains( + killedQueuedContIter.next().getContainerID())) { + killedQueuedContIter.remove(); + } + } + if (!removedContainers.isEmpty()) { LOG.info("Removed completed containers from NM context: " + removedContainers); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index 5b1b77a721b..4f9d5a359c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -554,7 +554,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { if (containerTokenIdentifier != null) { this.context.getQueuingContext().getKilledQueuedContainers() .putIfAbsent(cInfo.getContainerTokenIdentifier(), - "Container De-queued to meet global queuing limits. " + "Container de-queued to meet NM queuing limits. " + "Max Queue length[" + this.queuingLimit.getMaxQueueLength() + "]"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java index 8c235eb7034..689a32754ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java @@ -65,6 +65,7 @@ import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -1077,6 +1078,126 @@ public class TestNodeStatusUpdater { Assert.assertTrue(containerIdSet.contains(runningContainerId)); } + @Test(timeout = 90000) + public void testKilledQueuedContainers() throws Exception { + NodeManager nm = new NodeManager(); + YarnConfiguration conf = new YarnConfiguration(); + conf.set( + NodeStatusUpdaterImpl + .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS, + "10000"); + nm.init(conf); + NodeStatusUpdaterImpl nodeStatusUpdater = + (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater(); + ApplicationId appId = ApplicationId.newInstance(0, 0); + ApplicationAttemptId appAttemptId = + ApplicationAttemptId.newInstance(appId, 0); + + // Add application to context. + nm.getNMContext().getApplications().putIfAbsent(appId, + mock(Application.class)); + + // Create a running container and add it to the context. + ContainerId runningContainerId = + ContainerId.newContainerId(appAttemptId, 1); + Token runningContainerToken = + BuilderUtils.newContainerToken(runningContainerId, "anyHost", + 1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123, + "password".getBytes(), 0); + Container runningContainer = + new ContainerImpl(conf, null, null, null, null, + BuilderUtils.newContainerTokenIdentifier(runningContainerToken), + nm.getNMContext()) { + @Override + public ContainerState getCurrentState() { + return ContainerState.RUNNING; + } + + @Override + public org.apache.hadoop.yarn.server.nodemanager.containermanager. + container.ContainerState getContainerState() { + return org.apache.hadoop.yarn.server.nodemanager.containermanager. + container.ContainerState.RUNNING; + } + }; + + nm.getNMContext().getContainers() + .put(runningContainerId, runningContainer); + + // Create two killed queued containers and add them to the queuing context. + ContainerId killedQueuedContainerId1 = ContainerId.newContainerId( + appAttemptId, 2); + ContainerTokenIdentifier killedQueuedContainerTokenId1 = BuilderUtils + .newContainerTokenIdentifier(BuilderUtils.newContainerToken( + killedQueuedContainerId1, "anyHost", 1234, "anyUser", BuilderUtils + .newResource(1024, 1), 0, 123, "password".getBytes(), 0)); + ContainerId killedQueuedContainerId2 = ContainerId.newContainerId( + appAttemptId, 3); + ContainerTokenIdentifier killedQueuedContainerTokenId2 = BuilderUtils + .newContainerTokenIdentifier(BuilderUtils.newContainerToken( + killedQueuedContainerId2, "anyHost", 1234, "anyUser", BuilderUtils + .newResource(1024, 1), 0, 123, "password".getBytes(), 0)); + + nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put( + killedQueuedContainerTokenId1, "Queued container killed."); + nm.getNMContext().getQueuingContext().getKilledQueuedContainers().put( + killedQueuedContainerTokenId2, "Queued container killed."); + + List containerStatuses = nodeStatusUpdater + .getContainerStatuses(); + + Assert.assertEquals(3, containerStatuses.size()); + + ContainerStatus runningContainerStatus = null; + ContainerStatus killedQueuedContainerStatus1 = null; + ContainerStatus killedQueuedContainerStatus2 = null; + for (ContainerStatus cStatus : containerStatuses) { + if (ContainerState.RUNNING == cStatus.getState()) { + runningContainerStatus = cStatus; + } + if (ContainerState.COMPLETE == cStatus.getState()) { + if (killedQueuedContainerId1.equals(cStatus.getContainerId())) { + killedQueuedContainerStatus1 = cStatus; + } else { + killedQueuedContainerStatus2 = cStatus; + } + } + } + + // Check container IDs and Container Status. + Assert.assertNotNull(runningContainerId); + Assert.assertNotNull(killedQueuedContainerId1); + Assert.assertNotNull(killedQueuedContainerId2); + + // Killed queued container should have ABORTED exit status. + Assert.assertEquals(ContainerExitStatus.ABORTED, + killedQueuedContainerStatus1.getExitStatus()); + Assert.assertEquals(ContainerExitStatus.ABORTED, + killedQueuedContainerStatus2.getExitStatus()); + + // Killed queued container should appear in the recentlyStoppedContainers. + Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped( + killedQueuedContainerId1)); + Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped( + killedQueuedContainerId2)); + + // Check if killed queued containers are successfully removed from the + // queuing context. + List ackedContainers = new ArrayList(); + ackedContainers.add(killedQueuedContainerId1); + ackedContainers.add(killedQueuedContainerId2); + + nodeStatusUpdater.removeOrTrackCompletedContainersFromContext( + ackedContainers); + + containerStatuses = nodeStatusUpdater.getContainerStatuses(); + + // Only the running container should be in the container statuses now. + Assert.assertEquals(1, containerStatuses.size()); + Assert.assertEquals(ContainerState.RUNNING, + containerStatuses.get(0).getState()); + } + @Test(timeout = 10000) public void testCompletedContainersIsRecentlyStopped() throws Exception { NodeManager nm = new NodeManager(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index a06308924ae..4cb5a012678 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Resource; @@ -1341,21 +1342,28 @@ public class RMNodeImpl implements RMNode, EventHandler { // Process running containers if (remoteContainer.getState() == ContainerState.RUNNING) { - if (!launchedContainers.contains(containerId)) { - // Just launched container. RM knows about it the first time. - launchedContainers.add(containerId); - newlyLaunchedContainers.add(remoteContainer); - // Unregister from containerAllocationExpirer. - containerAllocationExpirer.unregister( - new AllocationExpirationInfo(containerId)); + // Process only GUARANTEED containers in the RM. + if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) { + if (!launchedContainers.contains(containerId)) { + // Just launched container. RM knows about it the first time. + launchedContainers.add(containerId); + newlyLaunchedContainers.add(remoteContainer); + // Unregister from containerAllocationExpirer. + containerAllocationExpirer + .unregister(new AllocationExpirationInfo(containerId)); + } } } else { - // A finished container - launchedContainers.remove(containerId); + if (remoteContainer.getExecutionType() == ExecutionType.GUARANTEED) { + // A finished container + launchedContainers.remove(containerId); + // Unregister from containerAllocationExpirer. + containerAllocationExpirer + .unregister(new AllocationExpirationInfo(containerId)); + } + // Completed containers should also include the OPPORTUNISTIC containers + // so that the AM gets properly notified. completedContainers.add(remoteContainer); - // Unregister from containerAllocationExpirer. - containerAllocationExpirer.unregister( - new AllocationExpirationInfo(containerId)); } } if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {