diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java index a1e3bdb20ad..38b1b07287b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java @@ -175,8 +175,9 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { } nodeStatusUpdater.sendOutofBandHeartBeat(); + } else { + super.stopContainerInternal(containerID); } - super.stopContainerInternal(containerID); } /** @@ -456,6 +457,18 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl { ContainerExitStatus.INVALID, this.context.getQueuingContext() .getQueuedContainers().get(containerID).getResource(), executionType); + } else { + // Check if part of the stopped/killed queued containers. + for (ContainerTokenIdentifier cTokenId : this.context + .getQueuingContext().getKilledQueuedContainers().keySet()) { + if (cTokenId.getContainerID().equals(containerID)) { + return BuilderUtils.newContainerStatus(containerID, + org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE, + this.context.getQueuingContext().getKilledQueuedContainers() + .get(cTokenId), ContainerExitStatus.ABORTED, cTokenId + .getResource(), cTokenId.getExecutionType()); + } + } } } return super.getContainerStatusInternal(containerID, nmTokenIdentifier); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java index 4d44d8d85db..caebef7354d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/TestQueuingContainerManager.java @@ -24,13 +24,13 @@ import java.util.Arrays; import java.util.List; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; +import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -41,15 +41,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.NMTokenIdentifier; import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor; -import org.apache.hadoop.yarn.server.nodemanager.Context; import org.apache.hadoop.yarn.server.nodemanager.DeletionService; import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest; import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl; import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor; import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin; -import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Test; @@ -58,11 +55,6 @@ import org.junit.Test; * Class for testing the {@link QueuingContainerManagerImpl}. */ public class TestQueuingContainerManager extends BaseContainerManagerTest { - - interface HasResources { - boolean decide(Context context, ContainerId cId); - } - public TestQueuingContainerManager() throws UnsupportedFileSystemException { super(); } @@ -78,18 +70,6 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest { DeletionService delSrvc) { return new QueuingContainerManagerImpl(context, exec, delSrvc, nodeStatusUpdater, metrics, dirsHandler) { - - @Override - public void serviceInit(Configuration conf) throws Exception { - conf.set( - YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR, - MockResourceCalculatorPlugin.class.getCanonicalName()); - conf.set( - YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, - MockResourceCalculatorProcessTree.class.getCanonicalName()); - super.serviceInit(conf); - } - @Override public void setBlockNewContainerRequests(boolean blockNewContainerRequests) { @@ -398,7 +378,7 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest { containerManager.startContainers(allRequests); BaseContainerManagerTest.waitForNMContainerState(containerManager, - createContainerId(0), ContainerState.DONE, 30); + createContainerId(0), ContainerState.DONE, 40); Thread.sleep(5000); // Get container statuses. Container 0 should be killed, container 1 @@ -429,7 +409,7 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest { // Make sure the remaining OPPORTUNISTIC container starts its execution. BaseContainerManagerTest.waitForNMContainerState(containerManager, - createContainerId(2), ContainerState.DONE, 30); + createContainerId(2), ContainerState.DONE, 40); Thread.sleep(5000); statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList( createContainerId(1))); @@ -488,13 +468,12 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest { containerManager.startContainers(allRequests); BaseContainerManagerTest.waitForNMContainerState(containerManager, - createContainerId(0), ContainerState.DONE, 30); + createContainerId(0), ContainerState.DONE, 40); Thread.sleep(5000); // Get container statuses. Container 0 should be killed, container 1 // should be queued and container 2 should be running. int killedContainers = 0; - int runningContainers = 0; List statList = new ArrayList(); for (int i = 0; i < 4; i++) { statList.add(createContainerId(i)); @@ -508,14 +487,108 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest { "Container killed by the ApplicationMaster")) { killedContainers++; } - if (status.getState() == - org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) { - runningContainers++; - } System.out.println("\nStatus : [" + status + "]\n"); } Assert.assertEquals(2, killedContainers); - Assert.assertEquals(2, runningContainers); + } + + /** + * Start running one GUARANTEED container and queue two OPPORTUNISTIC ones. + * Try killing one of the two queued containers. + * @throws Exception + */ + @Test + public void testStopQueuedContainer() throws Exception { + shouldDeleteWait = true; + containerManager.start(); + + ContainerLaunchContext containerLaunchContext = + recordFactory.newRecordInstance(ContainerLaunchContext.class); + + List list = new ArrayList<>(); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(2048, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.GUARANTEED))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + list.add(StartContainerRequest.newInstance( + containerLaunchContext, + createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER, + context.getNodeId(), + user, BuilderUtils.newResource(512, 1), + context.getContainerTokenSecretManager(), null, + ExecutionType.OPPORTUNISTIC))); + + StartContainersRequest allRequests = + StartContainersRequest.newInstance(list); + containerManager.startContainers(allRequests); + + Thread.sleep(2000); + + // Assert there is initially one container running and two queued. + int runningContainersNo = 0; + int queuedContainersNo = 0; + List statList = new ArrayList(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + GetContainerStatusesRequest statRequest = GetContainerStatusesRequest + .newInstance(statList); + List containerStatuses = containerManager + .getContainerStatuses(statRequest).getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getState() == + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) { + runningContainersNo++; + } else if (status.getState() == + org.apache.hadoop.yarn.api.records.ContainerState.QUEUED) { + queuedContainersNo++; + } + System.out.println("\nStatus : [" + status + "]\n"); + } + + Assert.assertEquals(1, runningContainersNo); + Assert.assertEquals(2, queuedContainersNo); + + // Stop one of the two queued containers. + StopContainersRequest stopRequest = StopContainersRequest. + newInstance(Arrays.asList(createContainerId(1))); + containerManager.stopContainers(stopRequest); + + Thread.sleep(2000); + + // Assert queued container got properly stopped. + statList.clear(); + for (int i = 0; i < 3; i++) { + statList.add(createContainerId(i)); + } + statRequest = GetContainerStatusesRequest.newInstance(statList); + containerStatuses = containerManager.getContainerStatuses(statRequest) + .getContainerStatuses(); + for (ContainerStatus status : containerStatuses) { + if (status.getContainerId().equals(createContainerId(0))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.RUNNING, + status.getState()); + } else if (status.getContainerId().equals(createContainerId(1))) { + Assert.assertTrue(status.getDiagnostics().contains( + "Queued container request removed")); + } else if (status.getContainerId().equals(createContainerId(2))) { + Assert.assertEquals( + org.apache.hadoop.yarn.api.records.ContainerState.QUEUED, + status.getState()); + } + System.out.println("\nStatus : [" + status + "]\n"); + } } }