YARN-4148. When killing app, RM releases app's resource before they are released by NM. Contributed by Jason Lowe.

(cherry picked from commit 945db55f2e)
This commit is contained in:
Junping Du 2017-01-09 18:14:46 -08:00
parent 137547579b
commit b600577095
8 changed files with 201 additions and 20 deletions

View File

@ -274,6 +274,7 @@ public abstract class AbstractYarnScheduler
} }
application.containerLaunchedOnNode(containerId, node.getNodeID()); application.containerLaunchedOnNode(containerId, node.getNodeID());
node.containerStarted(containerId);
} finally { } finally {
readLock.unlock(); readLock.unlock();
} }
@ -599,7 +600,7 @@ public abstract class AbstractYarnScheduler
" in state: " + rmContainer.getState() + " event:" + event); " in state: " + rmContainer.getState() + " event:" + event);
} }
getSchedulerNode(rmContainer.getNodeId()).releaseContainer( getSchedulerNode(rmContainer.getNodeId()).releaseContainer(
rmContainer.getContainer()); rmContainer.getContainerId(), false);
} }
// If the container is getting killed in ACQUIRED state, the requester (AM // If the container is getting killed in ACQUIRED state, the requester (AM
@ -933,6 +934,7 @@ public abstract class AbstractYarnScheduler
protected int updateCompletedContainers(List<ContainerStatus> protected int updateCompletedContainers(List<ContainerStatus>
completedContainers, Resource releasedResources, NodeId nodeId) { completedContainers, Resource releasedResources, NodeId nodeId) {
int releasedContainers = 0; int releasedContainers = 0;
SchedulerNode node = getNode(nodeId);
List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>(); List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
for (ContainerStatus completedContainer : completedContainers) { for (ContainerStatus completedContainer : completedContainers) {
ContainerId containerId = completedContainer.getContainerId(); ContainerId containerId = completedContainer.getContainerId();
@ -940,6 +942,10 @@ public abstract class AbstractYarnScheduler
RMContainer container = getRMContainer(containerId); RMContainer container = getRMContainer(containerId);
completedContainer(container, completedContainer(container,
completedContainer, RMContainerEventType.FINISHED); completedContainer, RMContainerEventType.FINISHED);
if (node != null) {
node.releaseContainer(containerId, true);
}
if (container != null) { if (container != null) {
releasedContainers++; releasedContainers++;
Resource ars = container.getAllocatedResource(); Resource ars = container.getAllocatedResource();

View File

@ -66,7 +66,7 @@ public abstract class SchedulerNode {
ResourceUtilization.newInstance(0, 0, 0f); ResourceUtilization.newInstance(0, 0, 0f);
/* set of containers that are allocated containers */ /* set of containers that are allocated containers */
protected final Map<ContainerId, RMContainer> launchedContainers = private final Map<ContainerId, ContainerInfo> launchedContainers =
new HashMap<>(); new HashMap<>();
private final RMNode rmNode; private final RMNode rmNode;
@ -148,14 +148,26 @@ public abstract class SchedulerNode {
* application. * application.
* @param rmContainer Allocated container * @param rmContainer Allocated container
*/ */
public synchronized void allocateContainer(RMContainer rmContainer) { public void allocateContainer(RMContainer rmContainer) {
allocateContainer(rmContainer, false);
}
/**
* The Scheduler has allocated containers on this node to the given
* application.
* @param rmContainer Allocated container
* @param launchedOnNode True if the container has been launched
*/
private synchronized void allocateContainer(RMContainer rmContainer,
boolean launchedOnNode) {
Container container = rmContainer.getContainer(); Container container = rmContainer.getContainer();
if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
deductUnallocatedResource(container.getResource()); deductUnallocatedResource(container.getResource());
++numContainers; ++numContainers;
} }
launchedContainers.put(container.getId(), rmContainer); launchedContainers.put(container.getId(),
new ContainerInfo(rmContainer, launchedOnNode));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Assigned container " + container.getId() + " of capacity " LOG.debug("Assigned container " + container.getId() + " of capacity "
@ -258,19 +270,25 @@ public abstract class SchedulerNode {
/** /**
* Release an allocated container on this node. * Release an allocated container on this node.
* @param container Container to be released. * @param containerId ID of container to be released.
* @param releasedByNode whether the release originates from a node update.
*/ */
public synchronized void releaseContainer(Container container) { public synchronized void releaseContainer(ContainerId containerId,
if (!isValidContainer(container.getId())) { boolean releasedByNode) {
LOG.error("Invalid container released " + container); ContainerInfo info = launchedContainers.get(containerId);
if (info == null) {
return; return;
} }
// Remove the containers from the nodemanger if (!releasedByNode && info.launchedOnNode) {
if (null != launchedContainers.remove(container.getId())) { // wait until node reports container has completed
updateResourceForReleasedContainer(container); return;
} }
launchedContainers.remove(containerId);
Container container = info.container.getContainer();
updateResourceForReleasedContainer(container);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Released container " + container.getId() + " of capacity " LOG.debug("Released container " + container.getId() + " of capacity "
+ container.getResource() + " on host " + rmNode.getNodeAddress() + container.getResource() + " on host " + rmNode.getNodeAddress()
@ -280,6 +298,17 @@ public abstract class SchedulerNode {
} }
} }
/**
* Inform the node that a container has launched.
* @param containerId ID of the launched container
*/
public synchronized void containerStarted(ContainerId containerId) {
ContainerInfo info = launchedContainers.get(containerId);
if (info != null) {
info.launchedOnNode = true;
}
}
/** /**
* Add unallocated resources to the node. This is used when unallocating a * Add unallocated resources to the node. This is used when unallocating a
* container. * container.
@ -345,7 +374,25 @@ public abstract class SchedulerNode {
* @return List of running containers in the node. * @return List of running containers in the node.
*/ */
public synchronized List<RMContainer> getCopiedListOfRunningContainers() { public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values()); List<RMContainer> result = new ArrayList<>(launchedContainers.size());
for (ContainerInfo info : launchedContainers.values()) {
result.add(info.container);
}
return result;
}
/**
* Get the container for the specified container ID.
* @param containerId The container ID
* @return The container for the specified container ID
*/
protected synchronized RMContainer getContainer(ContainerId containerId) {
RMContainer container = null;
ContainerInfo info = launchedContainers.get(containerId);
if (info != null) {
container = info.container;
}
return container;
} }
/** /**
@ -373,7 +420,7 @@ public abstract class SchedulerNode {
if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
return; return;
} }
allocateContainer(rmContainer); allocateContainer(rmContainer, true);
} }
/** /**
@ -438,4 +485,15 @@ public abstract class SchedulerNode {
public ResourceUtilization getNodeUtilization() { public ResourceUtilization getNodeUtilization() {
return this.nodeUtilization; return this.nodeUtilization;
} }
private static class ContainerInfo {
private final RMContainer container;
private boolean launchedOnNode;
public ContainerInfo(RMContainer container, boolean launchedOnNode) {
this.container = container;
this.launchedOnNode = launchedOnNode;
}
}
} }

View File

@ -1712,7 +1712,7 @@ public class LeafQueue extends AbstractCSQueue {
removed = application.containerCompleted(rmContainer, containerStatus, removed = application.containerCompleted(rmContainer, containerStatus,
event, node.getPartition()); event, node.getPartition());
node.releaseContainer(container); node.releaseContainer(rmContainer.getContainerId(), false);
} }
// Book-keeping // Book-keeping

View File

@ -126,7 +126,7 @@ public class FiCaSchedulerNode extends SchedulerNode {
// According to decisions from preemption policy, mark the container to killable // According to decisions from preemption policy, mark the container to killable
public synchronized void markContainerToKillable(ContainerId containerId) { public synchronized void markContainerToKillable(ContainerId containerId) {
RMContainer c = launchedContainers.get(containerId); RMContainer c = getContainer(containerId);
if (c != null && !killableContainers.containsKey(containerId)) { if (c != null && !killableContainers.containsKey(containerId)) {
killableContainers.put(containerId, c); killableContainers.put(containerId, c);
Resources.addTo(totalKillableResources, c.getAllocatedResource()); Resources.addTo(totalKillableResources, c.getAllocatedResource());
@ -136,7 +136,7 @@ public class FiCaSchedulerNode extends SchedulerNode {
// According to decisions from preemption policy, mark the container to // According to decisions from preemption policy, mark the container to
// non-killable // non-killable
public synchronized void markContainerToNonKillable(ContainerId containerId) { public synchronized void markContainerToNonKillable(ContainerId containerId) {
RMContainer c = launchedContainers.get(containerId); RMContainer c = getContainer(containerId);
if (c != null && killableContainers.containsKey(containerId)) { if (c != null && killableContainers.containsKey(containerId)) {
killableContainers.remove(containerId); killableContainers.remove(containerId);
Resources.subtractFrom(totalKillableResources, c.getAllocatedResource()); Resources.subtractFrom(totalKillableResources, c.getAllocatedResource());

View File

@ -907,7 +907,7 @@ public class FairScheduler extends
application.unreserve(rmContainer.getReservedSchedulerKey(), node); application.unreserve(rmContainer.getReservedSchedulerKey(), node);
} else{ } else{
application.containerCompleted(rmContainer, containerStatus, event); application.containerCompleted(rmContainer, containerStatus, event);
node.releaseContainer(container); node.releaseContainer(rmContainer.getContainerId(), false);
updateRootQueueMetrics(); updateRootQueueMetrics();
} }

View File

@ -854,7 +854,7 @@ public class FifoScheduler extends
RMNodeLabelsManager.NO_LABEL); RMNodeLabelsManager.NO_LABEL);
// Inform the node // Inform the node
node.releaseContainer(container); node.releaseContainer(rmContainer.getContainerId(), false);
// Update total usage // Update total usage
Resources.subtractFrom(usedResource, container.getResource()); Resources.subtractFrom(usedResource, container.getResource());

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceOption;
@ -423,6 +424,119 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
} }
} }
@Test(timeout=60000)
public void testContainerReleasedByNode() throws Exception {
System.out.println("Starting testContainerReleasedByNode");
configureScheduler();
YarnConfiguration conf = getConf();
MockRM rm1 = new MockRM(conf);
try {
rm1.start();
RMApp app1 =
rm1.submitApp(200, "name", "user",
new HashMap<ApplicationAccessType, String>(), false, "default",
-1, null, "Test", false, true);
MockNM nm1 =
new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
nm1.registerNode();
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// allocate a container that fills more than half the node
am1.allocate("127.0.0.1", 8192, 1, new ArrayList<ContainerId>());
nm1.nodeHeartbeat(true);
// wait for containers to be allocated.
List<Container> containers =
am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
// release the container from the AM
ContainerId cid = containers.get(0).getId();
List<ContainerId> releasedContainers = new ArrayList<>(1);
releasedContainers.add(cid);
List<ContainerStatus> completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
while (completedContainers.isEmpty()) {
Thread.sleep(10);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
}
// verify new container can be allocated immediately because container
// never launched on the node
containers = am1.allocate("127.0.0.1", 8192, 1,
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
// launch the container on the node
cid = containers.get(0).getId();
nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(),
ContainerState.RUNNING);
rm1.waitForState(nm1, cid, RMContainerState.RUNNING);
// release the container from the AM
releasedContainers.clear();
releasedContainers.add(cid);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
while (completedContainers.isEmpty()) {
Thread.sleep(10);
completedContainers = am1.allocate(
new ArrayList<ResourceRequest>(), releasedContainers)
.getCompletedContainersStatuses();
}
// verify new container cannot be allocated immediately because container
// has not been released by the node
containers = am1.allocate("127.0.0.1", 8192, 1,
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
Assert.assertTrue("new container allocated before node freed old",
containers.isEmpty());
for (int i = 0; i < 10; ++i) {
Thread.sleep(10);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
nm1.nodeHeartbeat(true);
Assert.assertTrue("new container allocated before node freed old",
containers.isEmpty());
}
// free the old container from the node
nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(),
ContainerState.COMPLETE);
// verify new container is now allocated
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
while (containers.isEmpty()) {
Thread.sleep(10);
nm1.nodeHeartbeat(true);
containers = am1.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
}
} finally {
rm1.stop();
System.out.println("Stopping testContainerReleasedByNode");
}
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testResourceRequestRestoreWhenRMContainerIsAtAllocated() public void testResourceRequestRestoreWhenRMContainerIsAtAllocated()
throws Exception { throws Exception {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq; import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
@ -162,7 +163,8 @@ public class TestChildQueueOrder {
}). }).
when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class),
any(ResourceLimits.class), any(SchedulingMode.class)); any(ResourceLimits.class), any(SchedulingMode.class));
doNothing().when(node).releaseContainer(any(Container.class)); doNothing().when(node).releaseContainer(any(ContainerId.class),
anyBoolean());
} }
@ -234,7 +236,8 @@ public class TestChildQueueOrder {
FiCaSchedulerNode node_0 = FiCaSchedulerNode node_0 =
TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
doNothing().when(node_0).releaseContainer(any(Container.class)); doNothing().when(node_0).releaseContainer(any(ContainerId.class),
anyBoolean());
final Resource clusterResource = final Resource clusterResource =
Resources.createResource(numNodes * (memoryPerNode*GB), Resources.createResource(numNodes * (memoryPerNode*GB),