YARN-5204. Properly report status of killed/stopped queued containers. (Konstantinos Karanasos via asuresh)
(cherry picked from commit 3344ba70e0
)
This commit is contained in:
parent
c6f33e096e
commit
c29fbdd8ac
|
@ -175,9 +175,10 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
|||
}
|
||||
|
||||
nodeStatusUpdater.sendOutofBandHeartBeat();
|
||||
}
|
||||
} else {
|
||||
super.stopContainerInternal(containerID);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the execution of the given container. Also add it to the allocated
|
||||
|
@ -456,6 +457,18 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
|||
ContainerExitStatus.INVALID, this.context.getQueuingContext()
|
||||
.getQueuedContainers().get(containerID).getResource(),
|
||||
executionType);
|
||||
} else {
|
||||
// Check if part of the stopped/killed queued containers.
|
||||
for (ContainerTokenIdentifier cTokenId : this.context
|
||||
.getQueuingContext().getKilledQueuedContainers().keySet()) {
|
||||
if (cTokenId.getContainerID().equals(containerID)) {
|
||||
return BuilderUtils.newContainerStatus(containerID,
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
|
||||
this.context.getQueuingContext().getKilledQueuedContainers()
|
||||
.get(cTokenId), ContainerExitStatus.ABORTED, cTokenId
|
||||
.getResource(), cTokenId.getExecutionType());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
|
||||
|
|
|
@ -24,13 +24,13 @@ import java.util.Arrays;
|
|||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
|
@ -41,15 +41,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -58,11 +55,6 @@ import org.junit.Test;
|
|||
* Class for testing the {@link QueuingContainerManagerImpl}.
|
||||
*/
|
||||
public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
||||
|
||||
interface HasResources {
|
||||
boolean decide(Context context, ContainerId cId);
|
||||
}
|
||||
|
||||
public TestQueuingContainerManager() throws UnsupportedFileSystemException {
|
||||
super();
|
||||
}
|
||||
|
@ -78,18 +70,6 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
|||
DeletionService delSrvc) {
|
||||
return new QueuingContainerManagerImpl(context, exec, delSrvc,
|
||||
nodeStatusUpdater, metrics, dirsHandler) {
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
conf.set(
|
||||
YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
|
||||
MockResourceCalculatorPlugin.class.getCanonicalName());
|
||||
conf.set(
|
||||
YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
|
||||
MockResourceCalculatorProcessTree.class.getCanonicalName());
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void
|
||||
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
||||
|
@ -398,7 +378,7 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
|||
containerManager.startContainers(allRequests);
|
||||
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||
createContainerId(0), ContainerState.DONE, 30);
|
||||
createContainerId(0), ContainerState.DONE, 40);
|
||||
Thread.sleep(5000);
|
||||
|
||||
// Get container statuses. Container 0 should be killed, container 1
|
||||
|
@ -429,7 +409,7 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
|||
|
||||
// Make sure the remaining OPPORTUNISTIC container starts its execution.
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||
createContainerId(2), ContainerState.DONE, 30);
|
||||
createContainerId(2), ContainerState.DONE, 40);
|
||||
Thread.sleep(5000);
|
||||
statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList(
|
||||
createContainerId(1)));
|
||||
|
@ -488,13 +468,12 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
|||
containerManager.startContainers(allRequests);
|
||||
|
||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||
createContainerId(0), ContainerState.DONE, 30);
|
||||
createContainerId(0), ContainerState.DONE, 40);
|
||||
Thread.sleep(5000);
|
||||
|
||||
// Get container statuses. Container 0 should be killed, container 1
|
||||
// should be queued and container 2 should be running.
|
||||
int killedContainers = 0;
|
||||
int runningContainers = 0;
|
||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||
for (int i = 0; i < 4; i++) {
|
||||
statList.add(createContainerId(i));
|
||||
|
@ -508,14 +487,108 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
|||
"Container killed by the ApplicationMaster")) {
|
||||
killedContainers++;
|
||||
}
|
||||
if (status.getState() ==
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
|
||||
runningContainers++;
|
||||
}
|
||||
System.out.println("\nStatus : [" + status + "]\n");
|
||||
}
|
||||
|
||||
Assert.assertEquals(2, killedContainers);
|
||||
Assert.assertEquals(2, runningContainers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start running one GUARANTEED container and queue two OPPORTUNISTIC ones.
|
||||
* Try killing one of the two queued containers.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testStopQueuedContainer() throws Exception {
|
||||
shouldDeleteWait = true;
|
||||
containerManager.start();
|
||||
|
||||
ContainerLaunchContext containerLaunchContext =
|
||||
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||
|
||||
List<StartContainerRequest> list = new ArrayList<>();
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(2048, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.GUARANTEED)));
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(512, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
list.add(StartContainerRequest.newInstance(
|
||||
containerLaunchContext,
|
||||
createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
|
||||
context.getNodeId(),
|
||||
user, BuilderUtils.newResource(512, 1),
|
||||
context.getContainerTokenSecretManager(), null,
|
||||
ExecutionType.OPPORTUNISTIC)));
|
||||
|
||||
StartContainersRequest allRequests =
|
||||
StartContainersRequest.newInstance(list);
|
||||
containerManager.startContainers(allRequests);
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
// Assert there is initially one container running and two queued.
|
||||
int runningContainersNo = 0;
|
||||
int queuedContainersNo = 0;
|
||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
statList.add(createContainerId(i));
|
||||
}
|
||||
GetContainerStatusesRequest statRequest = GetContainerStatusesRequest
|
||||
.newInstance(statList);
|
||||
List<ContainerStatus> containerStatuses = containerManager
|
||||
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||
for (ContainerStatus status : containerStatuses) {
|
||||
if (status.getState() ==
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
|
||||
runningContainersNo++;
|
||||
} else if (status.getState() ==
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED) {
|
||||
queuedContainersNo++;
|
||||
}
|
||||
System.out.println("\nStatus : [" + status + "]\n");
|
||||
}
|
||||
|
||||
Assert.assertEquals(1, runningContainersNo);
|
||||
Assert.assertEquals(2, queuedContainersNo);
|
||||
|
||||
// Stop one of the two queued containers.
|
||||
StopContainersRequest stopRequest = StopContainersRequest.
|
||||
newInstance(Arrays.asList(createContainerId(1)));
|
||||
containerManager.stopContainers(stopRequest);
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
// Assert queued container got properly stopped.
|
||||
statList.clear();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
statList.add(createContainerId(i));
|
||||
}
|
||||
statRequest = GetContainerStatusesRequest.newInstance(statList);
|
||||
containerStatuses = containerManager.getContainerStatuses(statRequest)
|
||||
.getContainerStatuses();
|
||||
for (ContainerStatus status : containerStatuses) {
|
||||
if (status.getContainerId().equals(createContainerId(0))) {
|
||||
Assert.assertEquals(
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||
status.getState());
|
||||
} else if (status.getContainerId().equals(createContainerId(1))) {
|
||||
Assert.assertTrue(status.getDiagnostics().contains(
|
||||
"Queued container request removed"));
|
||||
} else if (status.getContainerId().equals(createContainerId(2))) {
|
||||
Assert.assertEquals(
|
||||
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
|
||||
status.getState());
|
||||
}
|
||||
System.out.println("\nStatus : [" + status + "]\n");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue