From b6005770954e2103e2177e0f78de1f9f9618f082 Mon Sep 17 00:00:00 2001 From: Junping Du Date: Mon, 9 Jan 2017 18:14:46 -0800 Subject: [PATCH] YARN-4148. When killing app, RM releases app's resource before they are released by NM. Contributed by Jason Lowe. (cherry picked from commit 945db55f2e6521d33d4f90bbb09179b0feba5e7a) --- .../scheduler/AbstractYarnScheduler.java | 8 +- .../scheduler/SchedulerNode.java | 82 +++++++++++-- .../scheduler/capacity/LeafQueue.java | 2 +- .../common/fica/FiCaSchedulerNode.java | 4 +- .../scheduler/fair/FairScheduler.java | 2 +- .../scheduler/fifo/FifoScheduler.java | 2 +- .../scheduler/TestAbstractYarnScheduler.java | 114 ++++++++++++++++++ .../capacity/TestChildQueueOrder.java | 7 +- 8 files changed, 201 insertions(+), 20 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index a68989321a9..145b9b4e6b5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -274,6 +274,7 @@ public abstract class AbstractYarnScheduler } application.containerLaunchedOnNode(containerId, node.getNodeID()); + node.containerStarted(containerId); } finally { readLock.unlock(); } @@ -599,7 +600,7 @@ public abstract class AbstractYarnScheduler " in state: " + rmContainer.getState() + " event:" + event); } getSchedulerNode(rmContainer.getNodeId()).releaseContainer( - rmContainer.getContainer()); + rmContainer.getContainerId(), false); } // If the container is getting killed in ACQUIRED state, the requester (AM @@ -933,6 +934,7 @@ public abstract class AbstractYarnScheduler protected int updateCompletedContainers(List completedContainers, Resource releasedResources, NodeId nodeId) { int releasedContainers = 0; + SchedulerNode node = getNode(nodeId); List untrackedContainerIdList = new ArrayList(); for (ContainerStatus completedContainer : completedContainers) { ContainerId containerId = completedContainer.getContainerId(); @@ -940,6 +942,10 @@ public abstract class AbstractYarnScheduler RMContainer container = getRMContainer(containerId); completedContainer(container, completedContainer, RMContainerEventType.FINISHED); + if (node != null) { + node.releaseContainer(containerId, true); + } + if (container != null) { releasedContainers++; Resource ars = container.getAllocatedResource(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 759db0575cb..15fd830d7d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -66,7 +66,7 @@ public abstract class SchedulerNode { ResourceUtilization.newInstance(0, 0, 0f); /* set of containers that are allocated containers */ - protected final Map launchedContainers = + private final Map launchedContainers = new HashMap<>(); private final RMNode rmNode; @@ -148,14 +148,26 @@ public abstract class SchedulerNode { * application. * @param rmContainer Allocated container */ - public synchronized void allocateContainer(RMContainer rmContainer) { + public void allocateContainer(RMContainer rmContainer) { + allocateContainer(rmContainer, false); + } + + /** + * The Scheduler has allocated containers on this node to the given + * application. + * @param rmContainer Allocated container + * @param launchedOnNode True if the container has been launched + */ + private synchronized void allocateContainer(RMContainer rmContainer, + boolean launchedOnNode) { Container container = rmContainer.getContainer(); if (rmContainer.getExecutionType() == ExecutionType.GUARANTEED) { deductUnallocatedResource(container.getResource()); ++numContainers; } - launchedContainers.put(container.getId(), rmContainer); + launchedContainers.put(container.getId(), + new ContainerInfo(rmContainer, launchedOnNode)); if (LOG.isDebugEnabled()) { LOG.debug("Assigned container " + container.getId() + " of capacity " @@ -258,19 +270,25 @@ public abstract class SchedulerNode { /** * Release an allocated container on this node. - * @param container Container to be released. + * @param containerId ID of container to be released. + * @param releasedByNode whether the release originates from a node update. */ - public synchronized void releaseContainer(Container container) { - if (!isValidContainer(container.getId())) { - LOG.error("Invalid container released " + container); + public synchronized void releaseContainer(ContainerId containerId, + boolean releasedByNode) { + ContainerInfo info = launchedContainers.get(containerId); + if (info == null) { return; } - // Remove the containers from the nodemanger - if (null != launchedContainers.remove(container.getId())) { - updateResourceForReleasedContainer(container); + if (!releasedByNode && info.launchedOnNode) { + // wait until node reports container has completed + return; } + launchedContainers.remove(containerId); + Container container = info.container.getContainer(); + updateResourceForReleasedContainer(container); + if (LOG.isDebugEnabled()) { LOG.debug("Released container " + container.getId() + " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() @@ -280,6 +298,17 @@ public abstract class SchedulerNode { } } + /** + * Inform the node that a container has launched. + * @param containerId ID of the launched container + */ + public synchronized void containerStarted(ContainerId containerId) { + ContainerInfo info = launchedContainers.get(containerId); + if (info != null) { + info.launchedOnNode = true; + } + } + /** * Add unallocated resources to the node. This is used when unallocating a * container. @@ -345,7 +374,25 @@ public abstract class SchedulerNode { * @return List of running containers in the node. */ public synchronized List getCopiedListOfRunningContainers() { - return new ArrayList(launchedContainers.values()); + List result = new ArrayList<>(launchedContainers.size()); + for (ContainerInfo info : launchedContainers.values()) { + result.add(info.container); + } + return result; + } + + /** + * Get the container for the specified container ID. + * @param containerId The container ID + * @return The container for the specified container ID + */ + protected synchronized RMContainer getContainer(ContainerId containerId) { + RMContainer container = null; + ContainerInfo info = launchedContainers.get(containerId); + if (info != null) { + container = info.container; + } + return container; } /** @@ -373,7 +420,7 @@ public abstract class SchedulerNode { if (rmContainer.getState().equals(RMContainerState.COMPLETED)) { return; } - allocateContainer(rmContainer); + allocateContainer(rmContainer, true); } /** @@ -438,4 +485,15 @@ public abstract class SchedulerNode { public ResourceUtilization getNodeUtilization() { return this.nodeUtilization; } + + + private static class ContainerInfo { + private final RMContainer container; + private boolean launchedOnNode; + + public ContainerInfo(RMContainer container, boolean launchedOnNode) { + this.container = container; + this.launchedOnNode = launchedOnNode; + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 647c8dba2ce..25fb7396ce1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1712,7 +1712,7 @@ public class LeafQueue extends AbstractCSQueue { removed = application.containerCompleted(rmContainer, containerStatus, event, node.getPartition()); - node.releaseContainer(container); + node.releaseContainer(rmContainer.getContainerId(), false); } // Book-keeping diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java index 344daf2ae4c..8eac9297241 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java @@ -126,7 +126,7 @@ public class FiCaSchedulerNode extends SchedulerNode { // According to decisions from preemption policy, mark the container to killable public synchronized void markContainerToKillable(ContainerId containerId) { - RMContainer c = launchedContainers.get(containerId); + RMContainer c = getContainer(containerId); if (c != null && !killableContainers.containsKey(containerId)) { killableContainers.put(containerId, c); Resources.addTo(totalKillableResources, c.getAllocatedResource()); @@ -136,7 +136,7 @@ public class FiCaSchedulerNode extends SchedulerNode { // According to decisions from preemption policy, mark the container to // non-killable public synchronized void markContainerToNonKillable(ContainerId containerId) { - RMContainer c = launchedContainers.get(containerId); + RMContainer c = getContainer(containerId); if (c != null && killableContainers.containsKey(containerId)) { killableContainers.remove(containerId); Resources.subtractFrom(totalKillableResources, c.getAllocatedResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 29b8d8e82a8..40186813b1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -907,7 +907,7 @@ public class FairScheduler extends application.unreserve(rmContainer.getReservedSchedulerKey(), node); } else{ application.containerCompleted(rmContainer, containerStatus, event); - node.releaseContainer(container); + node.releaseContainer(rmContainer.getContainerId(), false); updateRootQueueMetrics(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index f0c0942ba88..6b02e22aeae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -854,7 +854,7 @@ public class FifoScheduler extends RMNodeLabelsManager.NO_LABEL); // Inform the node - node.releaseContainer(container); + node.releaseContainer(rmContainer.getContainerId(), false); // Update total usage Resources.subtractFrom(usedResource, container.getResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java index 63953397d11..89440dc8fe7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceOption; @@ -423,6 +424,119 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase { } } + @Test(timeout=60000) + public void testContainerReleasedByNode() throws Exception { + System.out.println("Starting testContainerReleasedByNode"); + configureScheduler(); + YarnConfiguration conf = getConf(); + MockRM rm1 = new MockRM(conf); + try { + rm1.start(); + RMApp app1 = + rm1.submitApp(200, "name", "user", + new HashMap(), false, "default", + -1, null, "Test", false, true); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService()); + nm1.registerNode(); + + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // allocate a container that fills more than half the node + am1.allocate("127.0.0.1", 8192, 1, new ArrayList()); + nm1.nodeHeartbeat(true); + + // wait for containers to be allocated. + List containers = + am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (containers.isEmpty()) { + Thread.sleep(10); + nm1.nodeHeartbeat(true); + containers = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + } + + // release the container from the AM + ContainerId cid = containers.get(0).getId(); + List releasedContainers = new ArrayList<>(1); + releasedContainers.add(cid); + List completedContainers = am1.allocate( + new ArrayList(), releasedContainers) + .getCompletedContainersStatuses(); + while (completedContainers.isEmpty()) { + Thread.sleep(10); + completedContainers = am1.allocate( + new ArrayList(), releasedContainers) + .getCompletedContainersStatuses(); + } + + // verify new container can be allocated immediately because container + // never launched on the node + containers = am1.allocate("127.0.0.1", 8192, 1, + new ArrayList()).getAllocatedContainers(); + nm1.nodeHeartbeat(true); + while (containers.isEmpty()) { + Thread.sleep(10); + nm1.nodeHeartbeat(true); + containers = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + } + + // launch the container on the node + cid = containers.get(0).getId(); + nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(), + ContainerState.RUNNING); + rm1.waitForState(nm1, cid, RMContainerState.RUNNING); + + // release the container from the AM + releasedContainers.clear(); + releasedContainers.add(cid); + completedContainers = am1.allocate( + new ArrayList(), releasedContainers) + .getCompletedContainersStatuses(); + while (completedContainers.isEmpty()) { + Thread.sleep(10); + completedContainers = am1.allocate( + new ArrayList(), releasedContainers) + .getCompletedContainersStatuses(); + } + + // verify new container cannot be allocated immediately because container + // has not been released by the node + containers = am1.allocate("127.0.0.1", 8192, 1, + new ArrayList()).getAllocatedContainers(); + nm1.nodeHeartbeat(true); + Assert.assertTrue("new container allocated before node freed old", + containers.isEmpty()); + for (int i = 0; i < 10; ++i) { + Thread.sleep(10); + containers = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + nm1.nodeHeartbeat(true); + Assert.assertTrue("new container allocated before node freed old", + containers.isEmpty()); + } + + // free the old container from the node + nm1.nodeHeartbeat(cid.getApplicationAttemptId(), cid.getContainerId(), + ContainerState.COMPLETE); + + // verify new container is now allocated + containers = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + while (containers.isEmpty()) { + Thread.sleep(10); + nm1.nodeHeartbeat(true); + containers = am1.allocate(new ArrayList(), + new ArrayList()).getAllocatedContainers(); + } + } finally { + rm1.stop(); + System.out.println("Stopping testContainerReleasedByNode"); + } + } + @Test(timeout = 60000) public void testResourceRequestRestoreWhenRMContainerIsAtAllocated() throws Exception { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index a6ae0c2d4e5..6c2b4e52ba5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -162,7 +163,8 @@ public class TestChildQueueOrder { }). when(queue).assignContainers(eq(clusterResource), any(PlacementSet.class), any(ResourceLimits.class), any(SchedulingMode.class)); - doNothing().when(node).releaseContainer(any(Container.class)); + doNothing().when(node).releaseContainer(any(ContainerId.class), + anyBoolean()); } @@ -234,7 +236,8 @@ public class TestChildQueueOrder { FiCaSchedulerNode node_0 = TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB); - doNothing().when(node_0).releaseContainer(any(Container.class)); + doNothing().when(node_0).releaseContainer(any(ContainerId.class), + anyBoolean()); final Resource clusterResource = Resources.createResource(numNodes * (memoryPerNode*GB),