YARN-4148. When killing app, RM releases app's resource before they are released by NM. Contributed by Jason Lowe.

This commit is contained in:
Junping Du 2017-01-09 18:14:46 -08:00
parent 7ec609b289
commit 945db55f2e
8 changed files with 201 additions and 20 deletions

View File

@ -280,6 +280,7 @@ public abstract class AbstractYarnScheduler
}
application.containerLaunchedOnNode(containerId, node.getNodeID());
node.containerStarted(containerId);
} finally {
readLock.unlock();
}
@ -607,7 +608,7 @@ public abstract class AbstractYarnScheduler
" in state: " + rmContainer.getState() + " event:" + event);
}
getSchedulerNode(rmContainer.getNodeId()).releaseContainer(
rmContainer.getContainer());
rmContainer.getContainerId(), false);
}
// If the container is getting killed in ACQUIRED state, the requester (AM
@ -941,6 +942,7 @@ public abstract class AbstractYarnScheduler
protected int updateCompletedContainers(List<ContainerStatus>
completedContainers, Resource releasedResources, NodeId nodeId) {
int releasedContainers = 0;
SchedulerNode node = getNode(nodeId);
List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId();
@ -948,6 +950,10 @@ public abstract class AbstractYarnScheduler
RMContainer container = getRMContainer(containerId);
completedContainer(container,
completedContainer, RMContainerEventType.FINISHED);
if (node != null) {
node.releaseContainer(containerId, true);
}
if (container != null) {
releasedContainers++;
Resource ars = container.getAllocatedResource();

View File

@ -66,7 +66,7 @@ public abstract class SchedulerNode {
ResourceUtilization.newInstance(0, 0, 0f);
/* set of containers that are allocated containers */
protected final Map<ContainerId, RMContainer> launchedContainers =
private final Map<ContainerId, ContainerInfo> launchedContainers =
new HashMap<>();
private final RMNode rmNode;
@ -148,14 +148,26 @@ public abstract class SchedulerNode {
* application.
* @param rmContainer Allocated container
*/
public synchronized void allocateContainer(RMContainer rmContainer) {
public void allocateContainer(RMContainer rmContainer) {
allocateContainer(rmContainer, false);
}
/**
* The Scheduler has allocated containers on this node to the given
* application.
* @param rmContainer Allocated container
* @param launchedOnNode True if the container has been launched
*/
private synchronized void allocateContainer(RMContainer rmContainer,
boolean launchedOnNode) {
Container container = rmContainer.getContainer();
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
deductUnallocatedResource(container.getResource());
++numContainers;
}
launchedContainers.put(container.getId(), rmContainer);
launchedContainers.put(container.getId(),
new ContainerInfo(rmContainer, launchedOnNode));
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned container " + container.getId() + " of capacity "
@ -258,19 +270,25 @@ public abstract class SchedulerNode {
/**
* Release an allocated container on this node.
* @param container Container to be released.
* @param containerId ID of container to be released.
* @param releasedByNode whether the release originates from a node update.
*/
public synchronized void releaseContainer(Container container) {
if (!isValidContainer(container.getId())) {
LOG.error("Invalid container released " + container);
public synchronized void releaseContainer(ContainerId containerId,
boolean releasedByNode) {
ContainerInfo info = launchedContainers.get(containerId);
if (info == null) {
return;
}
// Remove the containers from the nodemanger
if (null != launchedContainers.remove(container.getId())) {
updateResourceForReleasedContainer(container);
if (!releasedByNode && info.launchedOnNode) {
// wait until node reports container has completed
return;
}
launchedContainers.remove(containerId);
Container container = info.container.getContainer();
updateResourceForReleasedContainer(container);
if (LOG.isDebugEnabled()) {
LOG.debug("Released container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + rmNode.getNodeAddress()
@ -280,6 +298,17 @@ public abstract class SchedulerNode {
}
}
/**
* Inform the node that a container has launched.
* @param containerId ID of the launched container
*/
public synchronized void containerStarted(ContainerId containerId) {
ContainerInfo info = launchedContainers.get(containerId);
if (info != null) {
info.launchedOnNode = true;
}
}
/**
* Add unallocated resources to the node. This is used when unallocating a
* container.
@ -345,7 +374,25 @@ public abstract class SchedulerNode {
* @return List of running containers in the node.
*/
public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values());
List<RMContainer> result = new ArrayList<>(launchedContainers.size());
for (ContainerInfo info : launchedContainers.values()) {
result.add(info.container);
}
return result;
}
/**
* Get the container for the specified container ID.
* @param containerId The container ID
* @return The container for the specified container ID
*/
protected synchronized RMContainer getContainer(ContainerId containerId) {
RMContainer container = null;
ContainerInfo info = launchedContainers.get(containerId);
if (info != null) {
container = info.container;
}
return container;
}
/**
@ -373,7 +420,7 @@ public abstract class SchedulerNode {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return;
}
allocateContainer(rmContainer);
allocateContainer(rmContainer, true);
}
/**
@ -438,4 +485,15 @@ public abstract class SchedulerNode {
public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization;
}
private static class ContainerInfo {
private final RMContainer container;
private boolean launchedOnNode;
public ContainerInfo(RMContainer container, boolean launchedOnNode) {
this.container = container;
this.launchedOnNode = launchedOnNode;
}
}
}

View File

@ -1703,7 +1703,7 @@ public class LeafQueue extends AbstractCSQueue {
removed = application.containerCompleted(rmContainer, containerStatus,
event, node.getPartition());
node.releaseContainer(container);
node.releaseContainer(rmContainer.getContainerId(), false);
}
// Book-keeping

View File

@ -126,7 +126,7 @@ public class FiCaSchedulerNode extends SchedulerNode {
// According to decisions from preemption policy, mark the container to killable
public synchronized void markContainerToKillable(ContainerId containerId) {
RMContainer c = launchedContainers.get(containerId);
RMContainer c = getContainer(containerId);
if (c != null && !killableContainers.containsKey(containerId)) {
killableContainers.put(containerId, c);
Resources.addTo(totalKillableResources, c.getAllocatedResource());
@ -136,7 +136,7 @@ public class FiCaSchedulerNode extends SchedulerNode {
// According to decisions from preemption policy, mark the container to
// non-killable
public synchronized void markContainerToNonKillable(ContainerId containerId) {
RMContainer c = launchedContainers.get(containerId);
RMContainer c = getContainer(containerId);
if (c != null && killableContainers.containsKey(containerId)) {
killableContainers.remove(containerId);
Resources.subtractFrom(totalKillableResources, c.getAllocatedResource());

View File

@ -719,7 +719,7 @@ public class FairScheduler extends
application.unreserve(rmContainer.getReservedSchedulerKey(), node);
} else{
application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(container);
node.releaseContainer(rmContainer.getContainerId(), false);
updateRootQueueMetrics();
}

View File

@ -843,7 +843,7 @@ public class FifoScheduler extends
RMNodeLabelsManager.NO_LABEL);
// Inform the node
node.releaseContainer(container);
node.releaseContainer(rmContainer.getContainerId(), false);
// Update total usage
Resources.subtractFrom(usedResource, container.getResource());

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
@ -424,6 +425,119 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
}
}
@Test(timeout=60000)
public void testContainerReleasedByNode() throws Exception {
System.out.println("Starting testContainerReleasedByNode");
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default",
-1, null, "Test", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// allocate a container that fills more than half the node
am1.allocate("127.0.0.1", 8192, 1, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
// release the container from the AM
ContainerId cid = containers.get(0).getId();
List<ContainerId> releasedContainers = new ArrayList<>(1);
releasedContainers.add(cid);
List<ContainerStatus> completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
while (completedContainers.isEmpty()) {
Thread.sleep(10);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
}
// verify new container can be allocated immediately because container
// never launched on the node
containers = am1.allocate("127.0.0.1", 8192, 1,
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
// launch the container on the node
cid = containers.get(0).getId();
nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(),
ContainerState.RUNNING);
rm1.waitForState(nm1, cid, RMContainerState.RUNNING);
// release the container from the AM
releasedContainers.clear();
releasedContainers.add(cid);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
while (completedContainers.isEmpty()) {
Thread.sleep(10);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
}
// verify new container cannot be allocated immediately because container
// has not been released by the node
containers = am1.allocate("127.0.0.1", 8192, 1,
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
Assert.assertTrue("new container allocated before node freed old",
containers.isEmpty());
for (int i = 0; i < 10; ++i) {
Thread.sleep(10);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
Assert.assertTrue("new container allocated before node freed old",
containers.isEmpty());
}
// free the old container from the node
nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(),
ContainerState.COMPLETE);
// verify new container is now allocated
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
} finally {
rm1.stop();
System.out.println("Stopping testContainerReleasedByNode");
}
}
@Test(timeout = 60000)
public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
throws Exception {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@ -164,7 +165,8 @@ public class TestChildQueueOrder {
}).
when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class),
any(ResourceLimits.class), any(SchedulingMode.class));
doNothing().when(node).releaseContainer(any(Container.class));
doNothing().when(node).releaseContainer(any(ContainerId.class),
anyBoolean());
}
@ -236,7 +238,8 @@ public class TestChildQueueOrder {
FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
doNothing().when(node_0).releaseContainer(any(Container.class));
doNothing().when(node_0).releaseContainer(any(ContainerId.class),
anyBoolean());
final Resource clusterResource =
Resources.createResource(numNodes * (memoryPerNode*GB),