From 9a0ac56a5c2bbb7ee59e1fbc205c88710c98b55a Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 3 Aug 2016 18:53:14 +0000 Subject: [PATCH] YARN-4280. CapacityScheduler reservations may not prevent indefinite postponement on a busy cluster. Contributed by Kuhu Shukla (cherry picked from commit 4d92aefd35d4517d9435d81bafdec0d77905a7a0) --- .../scheduler/capacity/CSAssignment.java | 31 +- .../scheduler/capacity/LeafQueue.java | 7 +- .../scheduler/capacity/ParentQueue.java | 43 ++- .../allocator/AbstractContainerAllocator.java | 11 +- .../allocator/IncreaseContainerAllocator.java | 10 +- .../capacity/TestCapacityScheduler.java | 288 +++++++++++++++++- 6 files changed, 362 insertions(+), 28 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 6406efe9be2..7bea9afb1d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -34,36 +34,47 @@ public class CSAssignment { public static final CSAssignment NULL_ASSIGNMENT = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); + public static final CSAssignment SKIP_ASSIGNMENT = + new CSAssignment(SkippedType.OTHER); private Resource resource; private NodeType type; private RMContainer excessReservation; private FiCaSchedulerApp application; - private final boolean skipped; + private SkippedType skipped; + + /** + * Reason for the queue to get skipped. + */ + public enum SkippedType { + NONE, + QUEUE_LIMIT, + OTHER + } + private boolean fulfilledReservation; private final AssignmentInformation assignmentInformation; private boolean increaseAllocation; private List containersToKill; public CSAssignment(Resource resource, NodeType type) { - this(resource, type, null, null, false, false); + this(resource, type, null, null, SkippedType.NONE, false); } public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) { this(excessReservation.getContainer().getResource(), NodeType.NODE_LOCAL, - excessReservation, application, false, false); + excessReservation, application, SkippedType.NONE, false); } - public CSAssignment(boolean skipped) { + public CSAssignment(SkippedType skipped) { this(Resource.newInstance(0, 0), NodeType.NODE_LOCAL, null, null, skipped, false); } public CSAssignment(Resource resource, NodeType type, RMContainer excessReservation, FiCaSchedulerApp application, - boolean skipped, boolean fulfilledReservation) { + SkippedType skipped, boolean fulfilledReservation) { this.resource = resource; this.type = type; this.excessReservation = excessReservation; @@ -105,10 +116,14 @@ public class CSAssignment { excessReservation = rmContainer; } - public boolean getSkipped() { + public SkippedType getSkippedType() { return skipped; } - + + public void setSkippedType(SkippedType skippedType) { + this.skipped = skippedType; + } + @Override public String toString() { String ret = "resource:" + resource.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9aae9095c15..a243e93ae0d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; @@ -972,8 +973,12 @@ public class LeafQueue extends AbstractCSQueue { // Done return assignment; - } else if (assignment.getSkipped()) { + } else if (assignment.getSkippedType() + == CSAssignment.SkippedType.OTHER) { application.updateNodeInfoForAMDiagnostics(node); + } else if(assignment.getSkippedType() + == CSAssignment.SkippedType.QUEUE_LIMIT) { + return assignment; } else { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 6fcd6c19985..9ae35ee5026 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -37,7 +37,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; -import org.apache.hadoop.yarn.nodelabels.RMNodeLabel; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; @@ -474,6 +473,7 @@ public class ParentQueue extends AbstractCSQueue { " cluster=" + clusterResource); } else { + assignment.setSkippedType(assignedToChild.getSkippedType()); break; } @@ -511,14 +511,14 @@ public class ParentQueue extends AbstractCSQueue { } private ResourceLimits getResourceLimitsOfChild(CSQueue child, - Resource clusterResource, ResourceLimits parentLimits, + Resource clusterResource, Resource parentLimits, String nodePartition) { // Set resource-limit of a given child, child.limit = // min(my.limit - my.used + child.used, child.max) // Parent available resource = parent-limit - parent-used-resource Resource parentMaxAvailableResource = Resources.subtract( - parentLimits.getLimit(), queueUsage.getUsed(nodePartition)); + parentLimits, queueUsage.getUsed(nodePartition)); // Deduct killable from used Resources.addTo(parentMaxAvailableResource, getTotalKillableResource(nodePartition)); @@ -568,9 +568,9 @@ public class ParentQueue extends AbstractCSQueue { private synchronized CSAssignment assignContainersToChildQueues( Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, SchedulingMode schedulingMode) { - CSAssignment assignment = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - + CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; + + Resource parentLimits = limits.getLimit(); printChildQueues(); // Try to assign to most 'under-served' sub-queue @@ -584,20 +584,21 @@ public class ParentQueue extends AbstractCSQueue { // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits, node.getPartition()); + getResourceLimitsOfChild(childQueue, cluster, parentLimits, + node.getPartition()); - assignment = childQueue.assignContainers(cluster, node, + CSAssignment childAssignment = childQueue.assignContainers(cluster, node, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + - " stats: " + childQueue + " --> " + - assignment.getResource() + ", " + assignment.getType()); + " stats: " + childQueue + " --> " + + childAssignment.getResource() + ", " + childAssignment.getType()); } // If we do assign, remove the queue and re-insert in-order to re-sort if (Resources.greaterThan( resourceCalculator, cluster, - assignment.getResource(), Resources.none())) { + childAssignment.getResource(), Resources.none())) { // Only update childQueues when we doing non-partitioned node // allocation. if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) { @@ -610,7 +611,24 @@ public class ParentQueue extends AbstractCSQueue { printChildQueues(); } } + assignment = childAssignment; break; + } else if (childAssignment.getSkippedType() == + CSAssignment.SkippedType.QUEUE_LIMIT) { + if (assignment.getSkippedType() != + CSAssignment.SkippedType.QUEUE_LIMIT) { + assignment = childAssignment; + } + Resource resourceToSubtract = Resources.max(resourceCalculator, + cluster, childLimits.getHeadroom(), Resources.none()); + if(LOG.isDebugEnabled()) { + LOG.debug("Decrease parentLimits " + parentLimits + + " for " + this.getQueueName() + " by " + + resourceToSubtract + " as childQueue=" + + childQueue.getQueueName() + " is blocked"); + } + parentLimits = Resources.subtract(parentLimits, + resourceToSubtract); } } @@ -731,7 +749,8 @@ public class ParentQueue extends AbstractCSQueue { for (CSQueue childQueue : childQueues) { // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = getResourceLimitsOfChild(childQueue, - clusterResource, resourceLimits, RMNodeLabelsManager.NO_LABEL); + clusterResource, resourceLimits.getLimit(), + RMNodeLabelsManager.NO_LABEL); childQueue.updateClusterResource(clusterResource, childLimits); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index afac235aa36..4d5a7dc7f06 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -55,8 +55,10 @@ public abstract class AbstractContainerAllocator { Resource clusterResource, ContainerAllocation result, RMContainer rmContainer) { // Handle skipped - boolean skipped = - (result.getAllocationState() == AllocationState.APP_SKIPPED); + CSAssignment.SkippedType skipped = + (result.getAllocationState() == AllocationState.APP_SKIPPED) ? + CSAssignment.SkippedType.OTHER : + CSAssignment.SkippedType.NONE; CSAssignment assignment = new CSAssignment(skipped); assignment.setApplication(application); @@ -110,6 +112,11 @@ public abstract class AbstractContainerAllocator { } assignment.setContainersToKill(result.getToKillContainers()); + } else { + if (result.getAllocationState() == AllocationState.QUEUE_SKIPPED) { + assignment.setSkippedType( + CSAssignment.SkippedType.QUEUE_LIMIT); + } } return assignment; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 4a2ae1839fa..509dfba1cef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -71,7 +71,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { SchedContainerChangeRequest request) { CSAssignment assignment = new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null, - application, false, false); + application, CSAssignment.SkippedType.NONE, false); Resources.addTo(assignment.getAssignmentInformation().getReserved(), request.getDeltaCapacity()); assignment.getAssignmentInformation().incrReservations(); @@ -88,7 +88,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { SchedContainerChangeRequest request, boolean fromReservation) { CSAssignment assignment = new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null, - application, false, fromReservation); + application, CSAssignment.SkippedType.NONE, fromReservation); Resources.addTo(assignment.getAssignmentInformation().getAllocated(), request.getDeltaCapacity()); assignment.getAssignmentInformation().incrAllocations(); @@ -311,7 +311,8 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { // Try to allocate the increase request assigned = allocateIncreaseRequest(node, clusterResource, increaseRequest); - if (!assigned.getSkipped()) { + if (assigned.getSkippedType() + == CSAssignment.SkippedType.NONE) { // When we don't skip this request, which means we either allocated // OR reserved this request. We will break break; @@ -328,7 +329,8 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { } // We may have allocated something - if (assigned != null && !assigned.getSkipped()) { + if (assigned != null && assigned.getSkippedType() + == CSAssignment.SkippedType.NONE) { break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index d3567f5110c..2da7adbd1a7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -113,6 +113,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event. + ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -166,6 +168,12 @@ public class TestCapacityScheduler { private static final String B3 = B + ".b3"; private static float A_CAPACITY = 10.5f; private static float B_CAPACITY = 89.5f; + private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1"; + private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2"; + private static final String X1 = P1 + ".x1"; + private static final String X2 = P1 + ".x2"; + private static final String Y1 = P2 + ".y1"; + private static final String Y2 = P2 + ".y2"; private static float A1_CAPACITY = 30; private static float A2_CAPACITY = 70; private static float B1_CAPACITY = 79.2f; @@ -411,7 +419,52 @@ public class TestCapacityScheduler { LOG.info("Setup top-level queues a and b"); return conf; } - + + private CapacitySchedulerConfiguration setupBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b"}); + + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + conf.setUserLimitFactor(A, 100); + conf.setUserLimitFactor(B, 100); + conf.setMaximumCapacity(A, 100); + conf.setMaximumCapacity(B, 100); + LOG.info("Setup top-level queues a and b"); + return conf; + } + + private CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"p1", "p2"}); + + conf.setCapacity(P1, 50f); + conf.setMaximumCapacity(P1, 50f); + conf.setCapacity(P2, 50f); + conf.setMaximumCapacity(P2, 100f); + // Define 2nd-level queues + conf.setQueues(P1, new String[] {"x1", "x2"}); + conf.setCapacity(X1, 80f); + conf.setMaximumCapacity(X1, 100f); + conf.setUserLimitFactor(X1, 2f); + conf.setCapacity(X2, 20f); + conf.setMaximumCapacity(X2, 100f); + conf.setUserLimitFactor(X2, 2f); + + conf.setQueues(P2, new String[]{"y1", "y2"}); + conf.setCapacity(Y1, 80f); + conf.setUserLimitFactor(Y1, 2f); + conf.setCapacity(Y2, 20f); + conf.setUserLimitFactor(Y2, 2f); + return conf; + } + @Test public void testMaximumCapacitySetup() { float delta = 0.0000001f; @@ -3415,4 +3468,237 @@ public class TestCapacityScheduler { scheduler.handle(appRemovedEvent1); rm.stop(); } + + @Test + public void testCSReservationWithRootUnblocked() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setResourceComparator(DominantResourceCalculator.class); + setupOtherBlockedQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ParentQueue q = (ParentQueue) cs.getQueue("p1"); + + Assert.assertNotNull(q); + String host = "127.0.0.1"; + String host1 = "test"; + RMNode node = + MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host); + RMNode node1 = + MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1); + cs.handle(new NodeAddedSchedulerEvent(node)); + cs.handle(new NodeAddedSchedulerEvent(node1)); + ApplicationAttemptId appAttemptId1 = + appHelper(rm, cs, 100, 1, "x1", "userX1"); + ApplicationAttemptId appAttemptId2 = + appHelper(rm, cs, 100, 2, "x2", "userX2"); + ApplicationAttemptId appAttemptId3 = + appHelper(rm, cs, 100, 3, "y1", "userY1"); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest y1Req = null; + ResourceRequest x1Req = null; + ResourceRequest x2Req = null; + for(int i=0; i < 4; i++) { + y1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId3, + Collections.singletonList(y1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("Y1 Used Resource should be 4 GB", 4 * GB, + cs.getQueue("y1").getUsedResources().getMemorySize()); + assertEquals("P2 Used Resource should be 4 GB", 4 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + + for(int i=0; i < 7; i++) { + x1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, + Collections.singletonList(x1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("X1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("x1").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + + x2Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(x2Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + assertEquals("X2 Used Resource should be 0", 0, + cs.getQueue("x2").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + //this assign should fail + x1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, + Collections.singletonList(x1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + assertEquals("X1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("x1").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + + //this should get thru + for (int i=0; i < 4; i++) { + y1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId3, + Collections.singletonList(y1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("P2 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + + //Free a container from X1 + ContainerId containerId = ContainerId.newContainerId(appAttemptId1, 2); + cs.handle(new ContainerExpiredSchedulerEvent(containerId)); + + //Schedule pending request + CapacityScheduler.schedule(cs); + assertEquals("X2 Used Resource should be 2 GB", 2 * GB, + cs.getQueue("x2").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + assertEquals("P2 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + assertEquals("Root Used Resource should be 16 GB", 16 * GB, + cs.getRootQueue().getUsedResources().getMemorySize()); + rm.stop(); + } + + @Test + public void testCSQueueBlocked() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupBlockedQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue q = (LeafQueue) cs.getQueue("a"); + + Assert.assertNotNull(q); + String host = "127.0.0.1"; + String host1 = "test"; + RMNode node = + MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host); + RMNode node1 = + MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1); + cs.handle(new NodeAddedSchedulerEvent(node)); + cs.handle(new NodeAddedSchedulerEvent(node1)); + //add app begin + ApplicationAttemptId appAttemptId1 = + appHelper(rm, cs, 100, 1, "a", "user1"); + ApplicationAttemptId appAttemptId2 = + appHelper(rm, cs, 100, 2, "b", "user2"); + //add app end + + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest r1 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + //This will allocate for app1 + cs.allocate(appAttemptId1, Collections.singletonList(r1), + Collections.emptyList(), + null, null, null, null).getContainers().size(); + CapacityScheduler.schedule(cs); + ResourceRequest r2 = null; + for (int i =0; i < 13; i++) { + r2 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(r2), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("A Used Resource should be 2 GB", 2 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 2 GB", 13 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + r1 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + r2 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, Collections.singletonList(r1), + Collections.emptyList(), + null, null, null, null).getContainers().size(); + CapacityScheduler.schedule(cs); + + cs.allocate(appAttemptId2, Collections.singletonList(r2), + Collections.emptyList(), null, null, null, null); + CapacityScheduler.schedule(cs); + //Check blocked Resource + assertEquals("A Used Resource should be 2 GB", 2 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 13 GB", 13 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId2, 10); + ContainerId containerId2 =ContainerId.newContainerId(appAttemptId2, 11); + + cs.handle(new ContainerExpiredSchedulerEvent(containerId1)); + cs.handle(new ContainerExpiredSchedulerEvent(containerId2)); + CapacityScheduler.schedule(cs); + rm.drainEvents(); + assertEquals("A Used Resource should be 2 GB", 4 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 12 GB", 12 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + assertEquals("Used Resource on Root should be 16 GB", 16 * GB, + cs.getRootQueue().getUsedResources().getMemorySize()); + rm.stop(); + } + + private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs, + int clusterTs, int appId, String queue, + String user) { + ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( + appId1, appId); + + RMAppAttemptMetrics attemptMetric1 = + new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext()); + RMAppImpl app1 = mock(RMAppImpl.class); + when(app1.getApplicationId()).thenReturn(appId1); + RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(attempt1.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = mock( + ApplicationSubmissionContext.class); + when(attempt1.getSubmissionContext()).thenReturn(submissionContext); + when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); + when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); + when(app1.getCurrentAppAttempt()).thenReturn(attempt1); + rm.getRMContext().getRMApps().put(appId1, app1); + + SchedulerEvent addAppEvent1 = + new AppAddedSchedulerEvent(appId1, queue, user); + cs.handle(addAppEvent1); + SchedulerEvent addAttemptEvent1 = + new AppAttemptAddedSchedulerEvent(appAttemptId1, false); + cs.handle(addAttemptEvent1); + return appAttemptId1; + } }