From aca7eea6111cf4a8905bf44e0fb598f2f5ec37e8 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 3 Aug 2016 19:01:14 +0000 Subject: [PATCH] YARN-4280. CapacityScheduler reservations may not prevent indefinite postponement on a busy cluster. Contributed by Kuhu Shukla --- .../scheduler/capacity/CSAssignment.java | 29 +- .../scheduler/capacity/LeafQueue.java | 7 +- .../scheduler/capacity/ParentQueue.java | 41 ++- .../allocator/AbstractContainerAllocator.java | 11 +- .../allocator/IncreaseContainerAllocator.java | 10 +- .../capacity/TestCapacityScheduler.java | 288 +++++++++++++++++- 6 files changed, 359 insertions(+), 27 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java index 68f6f120f1d..5ed55a0c365 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java @@ -32,35 +32,44 @@ public class CSAssignment { public static final CSAssignment NULL_ASSIGNMENT = new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - public static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true); + public static final CSAssignment SKIP_ASSIGNMENT = + new CSAssignment(SkippedType.OTHER); private Resource resource; private NodeType type; private RMContainer excessReservation; private FiCaSchedulerApp application; - private final boolean skipped; + private SkippedType skipped; + /** + * Reason for the queue to get skipped. + */ + public enum SkippedType { + NONE, + QUEUE_LIMIT, + OTHER + } private boolean fulfilledReservation; private final AssignmentInformation assignmentInformation; private boolean increaseAllocation; public CSAssignment(Resource resource, NodeType type) { - this(resource, type, null, null, false, false); + this(resource, type, null, null, SkippedType.NONE, false); } public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) { this(excessReservation.getContainer().getResource(), NodeType.NODE_LOCAL, - excessReservation, application, false, false); + excessReservation, application, SkippedType.NONE, false); } - public CSAssignment(boolean skipped) { + public CSAssignment(SkippedType skipped) { this(Resource.newInstance(0, 0), NodeType.NODE_LOCAL, null, null, skipped, false); } public CSAssignment(Resource resource, NodeType type, RMContainer excessReservation, FiCaSchedulerApp application, - boolean skipped, boolean fulfilledReservation) { + SkippedType skipped, boolean fulfilledReservation) { this.resource = resource; this.type = type; this.excessReservation = excessReservation; @@ -102,10 +111,14 @@ public class CSAssignment { excessReservation = rmContainer; } - public boolean getSkipped() { + public SkippedType getSkippedType() { return skipped; } - + + public void setSkippedType(SkippedType skippedType) { + this.skipped = skippedType; + } + @Override public String toString() { String ret = "resource:" + resource.toString(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index df20b3c8266..006d8403452 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; + import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest; @@ -931,8 +932,12 @@ public class LeafQueue extends AbstractCSQueue { // Done return assignment; - } else if (assignment.getSkipped()) { + } else if (assignment.getSkippedType() + == CSAssignment.SkippedType.OTHER) { application.updateNodeInfoForAMDiagnostics(node); + } else if(assignment.getSkippedType() + == CSAssignment.SkippedType.QUEUE_LIMIT) { + return assignment; } else { // If we don't allocate anything, and it is not skipped by application, // we will return to respect FIFO of applications diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index a7d8796eb12..c5f1e11d7ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -467,6 +467,7 @@ public class ParentQueue extends AbstractCSQueue { " cluster=" + clusterResource); } else { + assignment.setSkippedType(assignedToChild.getSkippedType()); break; } @@ -500,13 +501,13 @@ public class ParentQueue extends AbstractCSQueue { } private ResourceLimits getResourceLimitsOfChild(CSQueue child, - Resource clusterResource, ResourceLimits parentLimits) { + Resource clusterResource, Resource parentLimits) { // Set resource-limit of a given child, child.limit = // min(my.limit - my.used + child.used, child.max) // Parent available resource = parent-limit - parent-used-resource Resource parentMaxAvailableResource = - Resources.subtract(parentLimits.getLimit(), getUsedResources()); + Resources.subtract(parentLimits, getUsedResources()); // Child's limit = parent-available-resource + child-used Resource childLimit = @@ -552,9 +553,9 @@ public class ParentQueue extends AbstractCSQueue { private synchronized CSAssignment assignContainersToChildQueues( Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, SchedulingMode schedulingMode) { - CSAssignment assignment = - new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL); - + CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; + + Resource parentLimits = limits.getLimit(); printChildQueues(); // Try to assign to most 'under-served' sub-queue @@ -568,20 +569,20 @@ public class ParentQueue extends AbstractCSQueue { // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, limits); + getResourceLimitsOfChild(childQueue, cluster, parentLimits); - assignment = childQueue.assignContainers(cluster, node, + CSAssignment childAssignment = childQueue.assignContainers(cluster, node, childLimits, schedulingMode); if(LOG.isDebugEnabled()) { LOG.debug("Assigned to queue: " + childQueue.getQueuePath() + - " stats: " + childQueue + " --> " + - assignment.getResource() + ", " + assignment.getType()); + " stats: " + childQueue + " --> " + + childAssignment.getResource() + ", " + childAssignment.getType()); } // If we do assign, remove the queue and re-insert in-order to re-sort if (Resources.greaterThan( resourceCalculator, cluster, - assignment.getResource(), Resources.none())) { + childAssignment.getResource(), Resources.none())) { // Only update childQueues when we doing non-partitioned node // allocation. if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) { @@ -594,7 +595,24 @@ public class ParentQueue extends AbstractCSQueue { printChildQueues(); } } + assignment = childAssignment; break; + } else if (childAssignment.getSkippedType() == + CSAssignment.SkippedType.QUEUE_LIMIT) { + if (assignment.getSkippedType() != + CSAssignment.SkippedType.QUEUE_LIMIT) { + assignment = childAssignment; + } + Resource resourceToSubtract = Resources.max(resourceCalculator, + cluster, childLimits.getHeadroom(), Resources.none()); + if(LOG.isDebugEnabled()) { + LOG.debug("Decrease parentLimits " + parentLimits + + " for " + this.getQueueName() + " by " + + resourceToSubtract + " as childQueue=" + + childQueue.getQueueName() + " is blocked"); + } + parentLimits = Resources.subtract(parentLimits, + resourceToSubtract); } } @@ -715,7 +733,8 @@ public class ParentQueue extends AbstractCSQueue { for (CSQueue childQueue : childQueues) { // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, clusterResource, resourceLimits); + getResourceLimitsOfChild(childQueue, clusterResource, + resourceLimits.getLimit()); childQueue.updateClusterResource(clusterResource, childLimits); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java index ee01bd17b30..7dd3fe54f5d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/AbstractContainerAllocator.java @@ -55,8 +55,10 @@ public abstract class AbstractContainerAllocator { Resource clusterResource, ContainerAllocation result, RMContainer rmContainer) { // Handle skipped - boolean skipped = - (result.getAllocationState() == AllocationState.APP_SKIPPED); + CSAssignment.SkippedType skipped = + (result.getAllocationState() == AllocationState.APP_SKIPPED) ? + CSAssignment.SkippedType.OTHER : + CSAssignment.SkippedType.NONE; CSAssignment assignment = new CSAssignment(skipped); assignment.setApplication(application); @@ -108,6 +110,11 @@ public abstract class AbstractContainerAllocator { assignment.setFulfilledReservation(true); } } + } else { + if (result.getAllocationState() == AllocationState.QUEUE_SKIPPED) { + assignment.setSkippedType( + CSAssignment.SkippedType.QUEUE_LIMIT); + } } return assignment; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java index 16cf6d32597..0c272b8a4f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/IncreaseContainerAllocator.java @@ -70,7 +70,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { SchedContainerChangeRequest request) { CSAssignment assignment = new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null, - application, false, false); + application, CSAssignment.SkippedType.NONE, false); Resources.addTo(assignment.getAssignmentInformation().getReserved(), request.getDeltaCapacity()); assignment.getAssignmentInformation().incrReservations(); @@ -87,7 +87,7 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { SchedContainerChangeRequest request, boolean fromReservation) { CSAssignment assignment = new CSAssignment(request.getDeltaCapacity(), NodeType.NODE_LOCAL, null, - application, false, fromReservation); + application, CSAssignment.SkippedType.NONE, fromReservation); Resources.addTo(assignment.getAssignmentInformation().getAllocated(), request.getDeltaCapacity()); assignment.getAssignmentInformation().incrAllocations(); @@ -308,7 +308,8 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { // Try to allocate the increase request assigned = allocateIncreaseRequest(node, clusterResource, increaseRequest); - if (!assigned.getSkipped()) { + if (assigned.getSkippedType() + == CSAssignment.SkippedType.NONE) { // When we don't skip this request, which means we either allocated // OR reserved this request. We will break break; @@ -324,7 +325,8 @@ public class IncreaseContainerAllocator extends AbstractContainerAllocator { } // We may have allocated something - if (assigned != null && !assigned.getSkipped()) { + if (assigned != null && assigned.getSkippedType() + == CSAssignment.SkippedType.NONE) { break; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index ca9a740a230..a3a4ca04ad3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -113,6 +113,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event. + ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -164,6 +166,12 @@ public class TestCapacityScheduler { private static final String B3 = B + ".b3"; private static float A_CAPACITY = 10.5f; private static float B_CAPACITY = 89.5f; + private static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1"; + private static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2"; + private static final String X1 = P1 + ".x1"; + private static final String X2 = P1 + ".x2"; + private static final String Y1 = P2 + ".y1"; + private static final String Y2 = P2 + ".y2"; private static float A1_CAPACITY = 30; private static float A2_CAPACITY = 70; private static float B1_CAPACITY = 79.2f; @@ -409,7 +417,52 @@ public class TestCapacityScheduler { LOG.info("Setup top-level queues a and b"); return conf; } - + + private CapacitySchedulerConfiguration setupBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b"}); + + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + conf.setUserLimitFactor(A, 100); + conf.setUserLimitFactor(B, 100); + conf.setMaximumCapacity(A, 100); + conf.setMaximumCapacity(B, 100); + LOG.info("Setup top-level queues a and b"); + return conf; + } + + private CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"p1", "p2"}); + + conf.setCapacity(P1, 50f); + conf.setMaximumCapacity(P1, 50f); + conf.setCapacity(P2, 50f); + conf.setMaximumCapacity(P2, 100f); + // Define 2nd-level queues + conf.setQueues(P1, new String[] {"x1", "x2"}); + conf.setCapacity(X1, 80f); + conf.setMaximumCapacity(X1, 100f); + conf.setUserLimitFactor(X1, 2f); + conf.setCapacity(X2, 20f); + conf.setMaximumCapacity(X2, 100f); + conf.setUserLimitFactor(X2, 2f); + + conf.setQueues(P2, new String[]{"y1", "y2"}); + conf.setCapacity(Y1, 80f); + conf.setUserLimitFactor(Y1, 2f); + conf.setCapacity(Y2, 20f); + conf.setUserLimitFactor(Y2, 2f); + return conf; + } + @Test public void testMaximumCapacitySetup() { float delta = 0.0000001f; @@ -3374,4 +3427,237 @@ public class TestCapacityScheduler { Assert.assertEquals(availableResource.getVirtualCores(), 0); } + + @Test + public void testCSReservationWithRootUnblocked() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setResourceComparator(DominantResourceCalculator.class); + setupOtherBlockedQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + ParentQueue q = (ParentQueue) cs.getQueue("p1"); + + Assert.assertNotNull(q); + String host = "127.0.0.1"; + String host1 = "test"; + RMNode node = + MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host); + RMNode node1 = + MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1); + cs.handle(new NodeAddedSchedulerEvent(node)); + cs.handle(new NodeAddedSchedulerEvent(node1)); + ApplicationAttemptId appAttemptId1 = + appHelper(rm, cs, 100, 1, "x1", "userX1"); + ApplicationAttemptId appAttemptId2 = + appHelper(rm, cs, 100, 2, "x2", "userX2"); + ApplicationAttemptId appAttemptId3 = + appHelper(rm, cs, 100, 3, "y1", "userY1"); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest y1Req = null; + ResourceRequest x1Req = null; + ResourceRequest x2Req = null; + for(int i=0; i < 4; i++) { + y1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId3, + Collections.singletonList(y1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("Y1 Used Resource should be 4 GB", 4 * GB, + cs.getQueue("y1").getUsedResources().getMemorySize()); + assertEquals("P2 Used Resource should be 4 GB", 4 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + + for(int i=0; i < 7; i++) { + x1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, + Collections.singletonList(x1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("X1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("x1").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + + x2Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(x2Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + assertEquals("X2 Used Resource should be 0", 0, + cs.getQueue("x2").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + //this assign should fail + x1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, + Collections.singletonList(x1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + assertEquals("X1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("x1").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 7 GB", 7 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + + //this should get thru + for (int i=0; i < 4; i++) { + y1Req = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId3, + Collections.singletonList(y1Req), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("P2 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + + //Free a container from X1 + ContainerId containerId = ContainerId.newContainerId(appAttemptId1, 2); + cs.handle(new ContainerExpiredSchedulerEvent(containerId)); + + //Schedule pending request + CapacityScheduler.schedule(cs); + assertEquals("X2 Used Resource should be 2 GB", 2 * GB, + cs.getQueue("x2").getUsedResources().getMemorySize()); + assertEquals("P1 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p1").getUsedResources().getMemorySize()); + assertEquals("P2 Used Resource should be 8 GB", 8 * GB, + cs.getQueue("p2").getUsedResources().getMemorySize()); + assertEquals("Root Used Resource should be 16 GB", 16 * GB, + cs.getRootQueue().getUsedResources().getMemorySize()); + rm.stop(); + } + + @Test + public void testCSQueueBlocked() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupBlockedQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(conf); + rm.start(); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue q = (LeafQueue) cs.getQueue("a"); + + Assert.assertNotNull(q); + String host = "127.0.0.1"; + String host1 = "test"; + RMNode node = + MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 1, host); + RMNode node1 = + MockNodes.newNodeInfo(0, Resource.newInstance(8 * GB, 8), 2, host1); + cs.handle(new NodeAddedSchedulerEvent(node)); + cs.handle(new NodeAddedSchedulerEvent(node1)); + //add app begin + ApplicationAttemptId appAttemptId1 = + appHelper(rm, cs, 100, 1, "a", "user1"); + ApplicationAttemptId appAttemptId2 = + appHelper(rm, cs, 100, 2, "b", "user2"); + //add app end + + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + Priority priority = TestUtils.createMockPriority(1); + ResourceRequest r1 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + //This will allocate for app1 + cs.allocate(appAttemptId1, Collections.singletonList(r1), + Collections.emptyList(), + null, null, null, null).getContainers().size(); + CapacityScheduler.schedule(cs); + ResourceRequest r2 = null; + for (int i =0; i < 13; i++) { + r2 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId2, + Collections.singletonList(r2), + Collections.emptyList(), + null, null, null, null); + CapacityScheduler.schedule(cs); + } + assertEquals("A Used Resource should be 2 GB", 2 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 2 GB", 13 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + r1 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 2 * GB, 1, true, priority, recordFactory); + r2 = TestUtils.createResourceRequest( + ResourceRequest.ANY, 1 * GB, 1, true, priority, recordFactory); + cs.allocate(appAttemptId1, Collections.singletonList(r1), + Collections.emptyList(), + null, null, null, null).getContainers().size(); + CapacityScheduler.schedule(cs); + + cs.allocate(appAttemptId2, Collections.singletonList(r2), + Collections.emptyList(), null, null, null, null); + CapacityScheduler.schedule(cs); + //Check blocked Resource + assertEquals("A Used Resource should be 2 GB", 2 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 13 GB", 13 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + + ContainerId containerId1 = ContainerId.newContainerId(appAttemptId2, 10); + ContainerId containerId2 =ContainerId.newContainerId(appAttemptId2, 11); + + cs.handle(new ContainerExpiredSchedulerEvent(containerId1)); + cs.handle(new ContainerExpiredSchedulerEvent(containerId2)); + CapacityScheduler.schedule(cs); + rm.drainEvents(); + assertEquals("A Used Resource should be 2 GB", 4 * GB, + cs.getQueue("a").getUsedResources().getMemorySize()); + assertEquals("B Used Resource should be 12 GB", 12 * GB, + cs.getQueue("b").getUsedResources().getMemorySize()); + assertEquals("Used Resource on Root should be 16 GB", 16 * GB, + cs.getRootQueue().getUsedResources().getMemorySize()); + rm.stop(); + } + + private ApplicationAttemptId appHelper(MockRM rm, CapacityScheduler cs, + int clusterTs, int appId, String queue, + String user) { + ApplicationId appId1 = BuilderUtils.newApplicationId(clusterTs, appId); + ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId( + appId1, appId); + + RMAppAttemptMetrics attemptMetric1 = + new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext()); + RMAppImpl app1 = mock(RMAppImpl.class); + when(app1.getApplicationId()).thenReturn(appId1); + RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class); + Container container = mock(Container.class); + when(attempt1.getMasterContainer()).thenReturn(container); + ApplicationSubmissionContext submissionContext = mock( + ApplicationSubmissionContext.class); + when(attempt1.getSubmissionContext()).thenReturn(submissionContext); + when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1); + when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1); + when(app1.getCurrentAppAttempt()).thenReturn(attempt1); + rm.getRMContext().getRMApps().put(appId1, app1); + + SchedulerEvent addAppEvent1 = + new AppAddedSchedulerEvent(appId1, queue, user); + cs.handle(addAppEvent1); + SchedulerEvent addAttemptEvent1 = + new AppAttemptAddedSchedulerEvent(appAttemptId1, false); + cs.handle(addAttemptEvent1); + return appAttemptId1; + } }