From bd69ea408f8fdd8293836ce1089fe9b01616f2f7 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Sun, 7 Jun 2015 11:37:52 -0700 Subject: [PATCH] YARN-3655. FairScheduler: potential livelock due to maxAMShare limitation and container reservation. (Zhihai Xu via kasha) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FSAppAttempt.java | 130 ++++---- .../scheduler/fair/FSQueue.java | 15 + .../scheduler/fair/FairScheduler.java | 42 +-- .../scheduler/fair/TestFairScheduler.java | 282 ++++++++++++++++++ 5 files changed, 379 insertions(+), 93 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index cc28977a09b..d90433d31e5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -498,6 +498,9 @@ Release 2.8.0 - UNRELEASED YARN-3766. Fixed the apps table column error of generic history web UI. (Xuan Gong via zjshen) + YARN-3655. FairScheduler: potential livelock due to maxAMShare limitation + and container reservation. (Zhihai Xu via kasha) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 6287debc0ec..7419446aa5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -541,39 +541,37 @@ public class FSAppAttempt extends SchedulerApplicationAttempt } return container.getResource(); - } else { - if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) { - return Resources.none(); - } - - // The desired container won't fit here, so reserve - reserve(request.getPriority(), node, container, reserved); - - return FairScheduler.CONTAINER_RESERVED; } + + // The desired container won't fit here, so reserve + reserve(request.getPriority(), node, container, reserved); + + return FairScheduler.CONTAINER_RESERVED; } private boolean hasNodeOrRackLocalRequests(Priority priority) { return getResourceRequests(priority).size() > 1; } - private Resource assignContainer(FSSchedulerNode node, boolean reserved) { - if (LOG.isDebugEnabled()) { - LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); - } - + /** + * Whether the AM container for this app is over maxAMShare limit. + */ + private boolean isOverAMShareLimit() { // Check the AM resource usage for the leaf queue if (!isAmRunning() && !getUnmanagedAM()) { List ask = appSchedulingInfo.getAllResourceRequests(); if (ask.isEmpty() || !getQueue().canRunAppAM( ask.get(0).getCapability())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skipping allocation because maxAMShare limit would " + - "be exceeded"); - } - return Resources.none(); + return true; } } + return false; + } + + private Resource assignContainer(FSSchedulerNode node, boolean reserved) { + if (LOG.isDebugEnabled()) { + LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved); + } Collection prioritiesToTry = (reserved) ? Arrays.asList(node.getReservedContainer().getReservedPriority()) : @@ -584,8 +582,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt // (not scheduled) in order to promote better locality. synchronized (this) { for (Priority priority : prioritiesToTry) { - if (getTotalRequiredResources(priority) <= 0 || - !hasContainerForNode(priority, node)) { + // Skip it for reserved container, since + // we already check it in isValidReservation. + if (!reserved && !hasContainerForNode(priority, node)) { continue; } @@ -650,42 +649,11 @@ public class FSAppAttempt extends SchedulerApplicationAttempt return Resources.none(); } - /** - * Called when this application already has an existing reservation on the - * given node. Sees whether we can turn the reservation into an allocation. - * Also checks whether the application needs the reservation anymore, and - * releases it if not. - * - * @param node - * Node that the application has an existing reservation on - */ - public Resource assignReservedContainer(FSSchedulerNode node) { - RMContainer rmContainer = node.getReservedContainer(); - Priority priority = rmContainer.getReservedPriority(); - - // Make sure the application still needs requests at this priority - if (getTotalRequiredResources(priority) == 0) { - unreserve(priority, node); - return Resources.none(); - } - - // Fail early if the reserved container won't fit. - // Note that we have an assumption here that there's only one container size - // per priority. - if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(), - node.getAvailableResource())) { - return Resources.none(); - } - - return assignContainer(node, true); - } - - /** * Whether this app has containers requests that could be satisfied on the * given node, if the node had full space. */ - public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { + private boolean hasContainerForNode(Priority prio, FSSchedulerNode node) { ResourceRequest anyRequest = getResourceRequest(prio, ResourceRequest.ANY); ResourceRequest rackRequest = getResourceRequest(prio, node.getRackName()); ResourceRequest nodeRequest = getResourceRequest(prio, node.getNodeName()); @@ -703,9 +671,56 @@ public class FSAppAttempt extends SchedulerApplicationAttempt (nodeRequest != null && nodeRequest.getNumContainers() > 0)) && // The requested container must be able to fit on the node: Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, - anyRequest.getCapability(), node.getRMNode().getTotalCapability()); + anyRequest.getCapability(), + node.getRMNode().getTotalCapability()) && + // The requested container must fit in queue maximum share: + getQueue().fitsInMaxShare(anyRequest.getCapability()); } + private boolean isValidReservation(FSSchedulerNode node) { + Priority reservedPriority = node.getReservedContainer(). + getReservedPriority(); + return hasContainerForNode(reservedPriority, node) && + !isOverAMShareLimit(); + } + + /** + * Called when this application already has an existing reservation on the + * given node. Sees whether we can turn the reservation into an allocation. + * Also checks whether the application needs the reservation anymore, and + * releases it if not. + * + * @param node + * Node that the application has an existing reservation on + * @return whether the reservation on the given node is valid. + */ + public boolean assignReservedContainer(FSSchedulerNode node) { + RMContainer rmContainer = node.getReservedContainer(); + Priority reservedPriority = rmContainer.getReservedPriority(); + + if (!isValidReservation(node)) { + // Don't hold the reservation if app can no longer use it + LOG.info("Releasing reservation that cannot be satisfied for " + + "application " + getApplicationAttemptId() + " on node " + node); + unreserve(reservedPriority, node); + return false; + } + + // Reservation valid; try to fulfill the reservation + if (LOG.isDebugEnabled()) { + LOG.debug("Trying to fulfill reservation for application " + + getApplicationAttemptId() + " on node: " + node); + } + + // Fail early if the reserved container won't fit. + // Note that we have an assumption here that + // there's only one container size per priority. + if (Resources.fitsIn(node.getReservedContainer().getReservedResource(), + node.getAvailableResource())) { + assignContainer(node, true); + } + return true; + } static class RMContainerComparator implements Comparator, Serializable { @@ -795,6 +810,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt @Override public Resource assignContainer(FSSchedulerNode node) { + if (isOverAMShareLimit()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping allocation because maxAMShare limit would " + + "be exceeded"); + } + return Resources.none(); + } return assignContainer(node, false); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index ade2880544e..e488c76ed66 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -330,4 +330,19 @@ public abstract class FSQueue implements Queue, Schedulable { @Override public void decPendingResource(String nodeLabel, Resource resourceToDec) { } + + public boolean fitsInMaxShare(Resource additionalResource) { + Resource usagePlusAddition = + Resources.add(getResourceUsage(), additionalResource); + + if (!Resources.fitsIn(usagePlusAddition, getMaxShare())) { + return false; + } + + FSQueue parentQueue = getParent(); + if (parentQueue != null) { + return parentQueue.fitsInMaxShare(additionalResource); + } + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 64b3f12f67e..2ed3b2a1bf7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -34,7 +34,6 @@ import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; @@ -1075,31 +1074,12 @@ public class FairScheduler extends // 1. Check for reserved applications // 2. Schedule if there are no reservations + boolean validReservation = false; FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { - Priority reservedPriority = node.getReservedContainer().getReservedPriority(); - FSQueue queue = reservedAppSchedulable.getQueue(); - - if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node) - || !fitsInMaxShare(queue, - node.getReservedContainer().getReservedResource())) { - // Don't hold the reservation if app can no longer use it - LOG.info("Releasing reservation that cannot be satisfied for application " - + reservedAppSchedulable.getApplicationAttemptId() - + " on node " + node); - reservedAppSchedulable.unreserve(reservedPriority, node); - reservedAppSchedulable = null; - } else { - // Reservation exists; try to fulfill the reservation - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to fulfill reservation for application " - + reservedAppSchedulable.getApplicationAttemptId() - + " on node: " + node); - } - node.getReservedAppSchedulable().assignReservedContainer(node); - } + validReservation = reservedAppSchedulable.assignReservedContainer(node); } - if (reservedAppSchedulable == null) { + if (!validReservation) { // No reservation, schedule at queue which is farthest below fair share int assignedContainers = 0; while (node.getReservedContainer() == null) { @@ -1117,22 +1097,6 @@ public class FairScheduler extends updateRootQueueMetrics(); } - static boolean fitsInMaxShare(FSQueue queue, Resource - additionalResource) { - Resource usagePlusAddition = - Resources.add(queue.getResourceUsage(), additionalResource); - - if (!Resources.fitsIn(usagePlusAddition, queue.getMaxShare())) { - return false; - } - - FSQueue parentQueue = queue.getParent(); - if (parentQueue != null) { - return fitsInMaxShare(parentQueue, additionalResource); - } - return true; - } - public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { return super.getApplicationAttempt(appAttemptId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 94fdc1a7368..56e8adcf13c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -3701,6 +3701,288 @@ public class TestFairScheduler extends FairSchedulerTestBase { 0, queue2.getAmResourceUsage().getMemory()); } + /** + * The test verifies container gets reserved when not over maxAMShare, + * reserved container gets unreserved when over maxAMShare, + * container doesn't get reserved when over maxAMShare, + * reserved container is turned into an allocation and + * superfluously reserved container gets unreserved. + * 1. create three nodes: Node1 is 10G, Node2 is 10G and Node3 is 5G. + * 2. APP1 allocated 1G on Node1 and APP2 allocated 1G on Node2. + * 3. APP3 reserved 10G on Node1 and Node2. + * 4. APP4 allocated 5G on Node3, which makes APP3 over maxAMShare. + * 5. Remove APP1 to make Node1 have 10G available resource. + * 6. APP3 unreserved its container on Node1 because it is over maxAMShare. + * 7. APP5 allocated 1G on Node1 after APP3 unreserved its container. + * 8. Remove APP3. + * 9. APP6 failed to reserve a 10G container on Node1 due to AMShare limit. + * 10. APP7 allocated 1G on Node1. + * 11. Remove APP4 and APP5. + * 12. APP6 reserved 10G on Node1 and Node2. + * 13. APP8 failed to allocate a 1G container on Node1 and Node2 because + * APP6 reserved Node1 and Node2. + * 14. Remove APP2. + * 15. APP6 turned the 10G reservation into an allocation on node2. + * 16. APP6 unreserved its container on node1, APP8 allocated 1G on Node1. + */ + @Test + public void testQueueMaxAMShareWithContainerReservation() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0.5"); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(10240, 10), + 1, "127.0.0.1"); + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(10240, 10), + 2, "127.0.0.2"); + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(5120, 5), + 3, "127.0.0.3"); + NodeAddedSchedulerEvent nodeE1 = new NodeAddedSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateE1 = new NodeUpdateSchedulerEvent(node1); + NodeAddedSchedulerEvent nodeE2 = new NodeAddedSchedulerEvent(node2); + NodeUpdateSchedulerEvent updateE2 = new NodeUpdateSchedulerEvent(node2); + NodeAddedSchedulerEvent nodeE3 = new NodeAddedSchedulerEvent(node3); + NodeUpdateSchedulerEvent updateE3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeE1); + scheduler.handle(nodeE2); + scheduler.handle(nodeE3); + scheduler.update(); + FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1", + true); + Resource amResource1 = Resource.newInstance(1024, 1); + Resource amResource2 = Resource.newInstance(1024, 1); + Resource amResource3 = Resource.newInstance(10240, 1); + Resource amResource4 = Resource.newInstance(5120, 1); + Resource amResource5 = Resource.newInstance(1024, 1); + Resource amResource6 = Resource.newInstance(10240, 1); + Resource amResource7 = Resource.newInstance(1024, 1); + Resource amResource8 = Resource.newInstance(1024, 1); + int amPriority = RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(); + ApplicationAttemptId attId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(attId1, "queue1", "user1", amResource1); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId1); + FSAppAttempt app1 = scheduler.getSchedulerApp(attId1); + scheduler.update(); + // Allocate app1's AM container on node1. + scheduler.handle(updateE1); + assertEquals("Application1's AM requests 1024 MB memory", + 1024, app1.getAMResource().getMemory()); + assertEquals("Application1's AM should be running", + 1, app1.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 1024 MB memory", + 1024, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId2 = createAppAttemptId(2, 1); + createApplicationWithAMResource(attId2, "queue1", "user1", amResource2); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId2); + FSAppAttempt app2 = scheduler.getSchedulerApp(attId2); + scheduler.update(); + // Allocate app2's AM container on node2. + scheduler.handle(updateE2); + assertEquals("Application2's AM requests 1024 MB memory", + 1024, app2.getAMResource().getMemory()); + assertEquals("Application2's AM should be running", + 1, app2.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId3 = createAppAttemptId(3, 1); + createApplicationWithAMResource(attId3, "queue1", "user1", amResource3); + createSchedulingRequestExistingApplication(10240, 1, amPriority, attId3); + FSAppAttempt app3 = scheduler.getSchedulerApp(attId3); + scheduler.update(); + // app3 reserves a container on node1 because node1's available resource + // is less than app3's AM container resource. + scheduler.handle(updateE1); + // Similarly app3 reserves a container on node2. + scheduler.handle(updateE2); + assertEquals("Application3's AM resource shouldn't be updated", + 0, app3.getAMResource().getMemory()); + assertEquals("Application3's AM should not be running", + 0, app3.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId4 = createAppAttemptId(4, 1); + createApplicationWithAMResource(attId4, "queue1", "user1", amResource4); + createSchedulingRequestExistingApplication(5120, 1, amPriority, attId4); + FSAppAttempt app4 = scheduler.getSchedulerApp(attId4); + scheduler.update(); + // app4 can't allocate its AM container on node1 because + // app3 already reserved its container on node1. + scheduler.handle(updateE1); + assertEquals("Application4's AM resource shouldn't be updated", + 0, app4.getAMResource().getMemory()); + assertEquals("Application4's AM should not be running", + 0, app4.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + scheduler.update(); + // Allocate app4's AM container on node3. + scheduler.handle(updateE3); + assertEquals("Application4's AM requests 5120 MB memory", + 5120, app4.getAMResource().getMemory()); + assertEquals("Application4's AM should be running", + 1, app4.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 7168 MB memory", + 7168, queue1.getAmResourceUsage().getMemory()); + + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = + new AppAttemptRemovedSchedulerEvent(attId1, + RMAppAttemptState.FINISHED, false); + // Release app1's AM container on node1. + scheduler.handle(appRemovedEvent1); + assertEquals("Queue1's AM resource usage should be 6144 MB memory", + 6144, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId5 = createAppAttemptId(5, 1); + createApplicationWithAMResource(attId5, "queue1", "user1", amResource5); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId5); + FSAppAttempt app5 = scheduler.getSchedulerApp(attId5); + scheduler.update(); + // app5 can allocate its AM container on node1 after + // app3 unreserve its container on node1 due to + // exceeding queue MaxAMShare limit. + scheduler.handle(updateE1); + assertEquals("Application5's AM requests 1024 MB memory", + 1024, app5.getAMResource().getMemory()); + assertEquals("Application5's AM should be running", + 1, app5.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 7168 MB memory", + 7168, queue1.getAmResourceUsage().getMemory()); + + AppAttemptRemovedSchedulerEvent appRemovedEvent3 = + new AppAttemptRemovedSchedulerEvent(attId3, + RMAppAttemptState.FINISHED, false); + // Remove app3. + scheduler.handle(appRemovedEvent3); + assertEquals("Queue1's AM resource usage should be 7168 MB memory", + 7168, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId6 = createAppAttemptId(6, 1); + createApplicationWithAMResource(attId6, "queue1", "user1", amResource6); + createSchedulingRequestExistingApplication(10240, 1, amPriority, attId6); + FSAppAttempt app6 = scheduler.getSchedulerApp(attId6); + scheduler.update(); + // app6 can't reserve a container on node1 because + // it exceeds queue MaxAMShare limit. + scheduler.handle(updateE1); + assertEquals("Application6's AM resource shouldn't be updated", + 0, app6.getAMResource().getMemory()); + assertEquals("Application6's AM should not be running", + 0, app6.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 7168 MB memory", + 7168, queue1.getAmResourceUsage().getMemory()); + + ApplicationAttemptId attId7 = createAppAttemptId(7, 1); + createApplicationWithAMResource(attId7, "queue1", "user1", amResource7); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId7); + FSAppAttempt app7 = scheduler.getSchedulerApp(attId7); + scheduler.update(); + // Allocate app7's AM container on node1 to prove + // app6 didn't reserve a container on node1. + scheduler.handle(updateE1); + assertEquals("Application7's AM requests 1024 MB memory", + 1024, app7.getAMResource().getMemory()); + assertEquals("Application7's AM should be running", + 1, app7.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 8192 MB memory", + 8192, queue1.getAmResourceUsage().getMemory()); + + AppAttemptRemovedSchedulerEvent appRemovedEvent4 = + new AppAttemptRemovedSchedulerEvent(attId4, + RMAppAttemptState.FINISHED, false); + // Release app4's AM container on node3. + scheduler.handle(appRemovedEvent4); + assertEquals("Queue1's AM resource usage should be 3072 MB memory", + 3072, queue1.getAmResourceUsage().getMemory()); + + AppAttemptRemovedSchedulerEvent appRemovedEvent5 = + new AppAttemptRemovedSchedulerEvent(attId5, + RMAppAttemptState.FINISHED, false); + // Release app5's AM container on node1. + scheduler.handle(appRemovedEvent5); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + scheduler.update(); + // app6 reserves a container on node1 because node1's available resource + // is less than app6's AM container resource and + // app6 is not over AMShare limit. + scheduler.handle(updateE1); + // Similarly app6 reserves a container on node2. + scheduler.handle(updateE2); + + ApplicationAttemptId attId8 = createAppAttemptId(8, 1); + createApplicationWithAMResource(attId8, "queue1", "user1", amResource8); + createSchedulingRequestExistingApplication(1024, 1, amPriority, attId8); + FSAppAttempt app8 = scheduler.getSchedulerApp(attId8); + scheduler.update(); + // app8 can't allocate a container on node1 because + // app6 already reserved a container on node1. + scheduler.handle(updateE1); + assertEquals("Application8's AM resource shouldn't be updated", + 0, app8.getAMResource().getMemory()); + assertEquals("Application8's AM should not be running", + 0, app8.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + scheduler.update(); + // app8 can't allocate a container on node2 because + // app6 already reserved a container on node2. + scheduler.handle(updateE2); + assertEquals("Application8's AM resource shouldn't be updated", + 0, app8.getAMResource().getMemory()); + assertEquals("Application8's AM should not be running", + 0, app8.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 2048 MB memory", + 2048, queue1.getAmResourceUsage().getMemory()); + + AppAttemptRemovedSchedulerEvent appRemovedEvent2 = + new AppAttemptRemovedSchedulerEvent(attId2, + RMAppAttemptState.FINISHED, false); + // Release app2's AM container on node2. + scheduler.handle(appRemovedEvent2); + assertEquals("Queue1's AM resource usage should be 1024 MB memory", + 1024, queue1.getAmResourceUsage().getMemory()); + + scheduler.update(); + // app6 turns the reservation into an allocation on node2. + scheduler.handle(updateE2); + assertEquals("Application6's AM requests 10240 MB memory", + 10240, app6.getAMResource().getMemory()); + assertEquals("Application6's AM should be running", + 1, app6.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 11264 MB memory", + 11264, queue1.getAmResourceUsage().getMemory()); + + scheduler.update(); + // app6 unreserve its container on node1 because + // it already got a container on node2. + // Now app8 can allocate its AM container on node1. + scheduler.handle(updateE1); + assertEquals("Application8's AM requests 1024 MB memory", + 1024, app8.getAMResource().getMemory()); + assertEquals("Application8's AM should be running", + 1, app8.getLiveContainers().size()); + assertEquals("Queue1's AM resource usage should be 12288 MB memory", + 12288, queue1.getAmResourceUsage().getMemory()); + } + @Test public void testMaxRunningAppsHierarchicalQueues() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);