YARN-5204. Properly report status of killed/stopped queued containers. (Konstantinos Karanasos via asuresh)
This commit is contained in:
parent
8c8a377cac
commit
3344ba70e0
|
@ -175,9 +175,10 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
||||||
}
|
}
|
||||||
|
|
||||||
nodeStatusUpdater.sendOutofBandHeartBeat();
|
nodeStatusUpdater.sendOutofBandHeartBeat();
|
||||||
}
|
} else {
|
||||||
super.stopContainerInternal(containerID);
|
super.stopContainerInternal(containerID);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Start the execution of the given container. Also add it to the allocated
|
* Start the execution of the given container. Also add it to the allocated
|
||||||
|
@ -456,6 +457,18 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
|
||||||
ContainerExitStatus.INVALID, this.context.getQueuingContext()
|
ContainerExitStatus.INVALID, this.context.getQueuingContext()
|
||||||
.getQueuedContainers().get(containerID).getResource(),
|
.getQueuedContainers().get(containerID).getResource(),
|
||||||
executionType);
|
executionType);
|
||||||
|
} else {
|
||||||
|
// Check if part of the stopped/killed queued containers.
|
||||||
|
for (ContainerTokenIdentifier cTokenId : this.context
|
||||||
|
.getQueuingContext().getKilledQueuedContainers().keySet()) {
|
||||||
|
if (cTokenId.getContainerID().equals(containerID)) {
|
||||||
|
return BuilderUtils.newContainerStatus(containerID,
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE,
|
||||||
|
this.context.getQueuingContext().getKilledQueuedContainers()
|
||||||
|
.get(cTokenId), ContainerExitStatus.ABORTED, cTokenId
|
||||||
|
.getResource(), cTokenId.getExecutionType());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
|
return super.getContainerStatusInternal(containerID, nmTokenIdentifier);
|
||||||
|
|
|
@ -24,13 +24,13 @@ import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
@ -41,15 +41,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorPlugin;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.MockResourceCalculatorProcessTree;
|
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -58,11 +55,6 @@ import org.junit.Test;
|
||||||
* Class for testing the {@link QueuingContainerManagerImpl}.
|
* Class for testing the {@link QueuingContainerManagerImpl}.
|
||||||
*/
|
*/
|
||||||
public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
||||||
|
|
||||||
interface HasResources {
|
|
||||||
boolean decide(Context context, ContainerId cId);
|
|
||||||
}
|
|
||||||
|
|
||||||
public TestQueuingContainerManager() throws UnsupportedFileSystemException {
|
public TestQueuingContainerManager() throws UnsupportedFileSystemException {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
@ -78,18 +70,6 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
||||||
DeletionService delSrvc) {
|
DeletionService delSrvc) {
|
||||||
return new QueuingContainerManagerImpl(context, exec, delSrvc,
|
return new QueuingContainerManagerImpl(context, exec, delSrvc,
|
||||||
nodeStatusUpdater, metrics, dirsHandler) {
|
nodeStatusUpdater, metrics, dirsHandler) {
|
||||||
|
|
||||||
@Override
|
|
||||||
public void serviceInit(Configuration conf) throws Exception {
|
|
||||||
conf.set(
|
|
||||||
YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
|
|
||||||
MockResourceCalculatorPlugin.class.getCanonicalName());
|
|
||||||
conf.set(
|
|
||||||
YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE,
|
|
||||||
MockResourceCalculatorProcessTree.class.getCanonicalName());
|
|
||||||
super.serviceInit(conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void
|
public void
|
||||||
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
setBlockNewContainerRequests(boolean blockNewContainerRequests) {
|
||||||
|
@ -398,7 +378,7 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
||||||
containerManager.startContainers(allRequests);
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||||
createContainerId(0), ContainerState.DONE, 30);
|
createContainerId(0), ContainerState.DONE, 40);
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
|
|
||||||
// Get container statuses. Container 0 should be killed, container 1
|
// Get container statuses. Container 0 should be killed, container 1
|
||||||
|
@ -429,7 +409,7 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
||||||
|
|
||||||
// Make sure the remaining OPPORTUNISTIC container starts its execution.
|
// Make sure the remaining OPPORTUNISTIC container starts its execution.
|
||||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||||
createContainerId(2), ContainerState.DONE, 30);
|
createContainerId(2), ContainerState.DONE, 40);
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList(
|
statRequest = GetContainerStatusesRequest.newInstance(Arrays.asList(
|
||||||
createContainerId(1)));
|
createContainerId(1)));
|
||||||
|
@ -488,13 +468,12 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
||||||
containerManager.startContainers(allRequests);
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
BaseContainerManagerTest.waitForNMContainerState(containerManager,
|
||||||
createContainerId(0), ContainerState.DONE, 30);
|
createContainerId(0), ContainerState.DONE, 40);
|
||||||
Thread.sleep(5000);
|
Thread.sleep(5000);
|
||||||
|
|
||||||
// Get container statuses. Container 0 should be killed, container 1
|
// Get container statuses. Container 0 should be killed, container 1
|
||||||
// should be queued and container 2 should be running.
|
// should be queued and container 2 should be running.
|
||||||
int killedContainers = 0;
|
int killedContainers = 0;
|
||||||
int runningContainers = 0;
|
|
||||||
List<ContainerId> statList = new ArrayList<ContainerId>();
|
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||||
for (int i = 0; i < 4; i++) {
|
for (int i = 0; i < 4; i++) {
|
||||||
statList.add(createContainerId(i));
|
statList.add(createContainerId(i));
|
||||||
|
@ -508,14 +487,108 @@ public class TestQueuingContainerManager extends BaseContainerManagerTest {
|
||||||
"Container killed by the ApplicationMaster")) {
|
"Container killed by the ApplicationMaster")) {
|
||||||
killedContainers++;
|
killedContainers++;
|
||||||
}
|
}
|
||||||
if (status.getState() ==
|
|
||||||
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
|
|
||||||
runningContainers++;
|
|
||||||
}
|
|
||||||
System.out.println("\nStatus : [" + status + "]\n");
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
Assert.assertEquals(2, killedContainers);
|
Assert.assertEquals(2, killedContainers);
|
||||||
Assert.assertEquals(2, runningContainers);
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start running one GUARANTEED container and queue two OPPORTUNISTIC ones.
|
||||||
|
* Try killing one of the two queued containers.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testStopQueuedContainer() throws Exception {
|
||||||
|
shouldDeleteWait = true;
|
||||||
|
containerManager.start();
|
||||||
|
|
||||||
|
ContainerLaunchContext containerLaunchContext =
|
||||||
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
||||||
|
|
||||||
|
List<StartContainerRequest> list = new ArrayList<>();
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(2048, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.GUARANTEED)));
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(512, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
|
list.add(StartContainerRequest.newInstance(
|
||||||
|
containerLaunchContext,
|
||||||
|
createContainerToken(createContainerId(2), DUMMY_RM_IDENTIFIER,
|
||||||
|
context.getNodeId(),
|
||||||
|
user, BuilderUtils.newResource(512, 1),
|
||||||
|
context.getContainerTokenSecretManager(), null,
|
||||||
|
ExecutionType.OPPORTUNISTIC)));
|
||||||
|
|
||||||
|
StartContainersRequest allRequests =
|
||||||
|
StartContainersRequest.newInstance(list);
|
||||||
|
containerManager.startContainers(allRequests);
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
// Assert there is initially one container running and two queued.
|
||||||
|
int runningContainersNo = 0;
|
||||||
|
int queuedContainersNo = 0;
|
||||||
|
List<ContainerId> statList = new ArrayList<ContainerId>();
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
statList.add(createContainerId(i));
|
||||||
|
}
|
||||||
|
GetContainerStatusesRequest statRequest = GetContainerStatusesRequest
|
||||||
|
.newInstance(statList);
|
||||||
|
List<ContainerStatus> containerStatuses = containerManager
|
||||||
|
.getContainerStatuses(statRequest).getContainerStatuses();
|
||||||
|
for (ContainerStatus status : containerStatuses) {
|
||||||
|
if (status.getState() ==
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING) {
|
||||||
|
runningContainersNo++;
|
||||||
|
} else if (status.getState() ==
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED) {
|
||||||
|
queuedContainersNo++;
|
||||||
|
}
|
||||||
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
Assert.assertEquals(1, runningContainersNo);
|
||||||
|
Assert.assertEquals(2, queuedContainersNo);
|
||||||
|
|
||||||
|
// Stop one of the two queued containers.
|
||||||
|
StopContainersRequest stopRequest = StopContainersRequest.
|
||||||
|
newInstance(Arrays.asList(createContainerId(1)));
|
||||||
|
containerManager.stopContainers(stopRequest);
|
||||||
|
|
||||||
|
Thread.sleep(2000);
|
||||||
|
|
||||||
|
// Assert queued container got properly stopped.
|
||||||
|
statList.clear();
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
statList.add(createContainerId(i));
|
||||||
|
}
|
||||||
|
statRequest = GetContainerStatusesRequest.newInstance(statList);
|
||||||
|
containerStatuses = containerManager.getContainerStatuses(statRequest)
|
||||||
|
.getContainerStatuses();
|
||||||
|
for (ContainerStatus status : containerStatuses) {
|
||||||
|
if (status.getContainerId().equals(createContainerId(0))) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
|
||||||
|
status.getState());
|
||||||
|
} else if (status.getContainerId().equals(createContainerId(1))) {
|
||||||
|
Assert.assertTrue(status.getDiagnostics().contains(
|
||||||
|
"Queued container request removed"));
|
||||||
|
} else if (status.getContainerId().equals(createContainerId(2))) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
org.apache.hadoop.yarn.api.records.ContainerState.QUEUED,
|
||||||
|
status.getState());
|
||||||
|
}
|
||||||
|
System.out.println("\nStatus : [" + status + "]\n");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue