diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index 721eb362a93..820d2fa58bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -38,6 +38,9 @@ public class ResourceLimits { // containers. private volatile Resource headroom; + // How much resource should be reserved for high-priority blocked queues + private Resource blockedHeadroom; + private boolean allowPreempt = false; public ResourceLimits(Resource limit) { @@ -81,4 +84,25 @@ public class ResourceLimits { public void setIsAllowPreemption(boolean allowPreempt) { this.allowPreempt = allowPreempt; } + + public void addBlockedHeadroom(Resource resource) { + if (blockedHeadroom == null) { + blockedHeadroom = Resource.newInstance(0, 0); + } + Resources.addTo(blockedHeadroom, resource); + } + + public Resource getBlockedHeadroom() { + if (blockedHeadroom == null) { + return Resources.none(); + } + return blockedHeadroom; + } + + public Resource getNetLimit() { + if (blockedHeadroom != null) { + return Resources.subtract(limit, blockedHeadroom); + } + return limit; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index e65f0631f6b..7eb1c29f406 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -776,7 +776,6 @@ public class ParentQueue extends AbstractCSQueue { SchedulingMode schedulingMode) { CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; - Resource parentLimits = limits.getLimit(); printChildQueues(); // Try to assign to most 'under-served' sub-queue @@ -790,7 +789,7 @@ public class ParentQueue extends AbstractCSQueue { // Get ResourceLimits of child queue before assign containers ResourceLimits childLimits = - getResourceLimitsOfChild(childQueue, cluster, parentLimits, + getResourceLimitsOfChild(childQueue, cluster, limits.getNetLimit(), candidates.getPartition()); CSAssignment childAssignment = childQueue.assignContainers(cluster, @@ -812,16 +811,21 @@ public class ParentQueue extends AbstractCSQueue { CSAssignment.SkippedType.QUEUE_LIMIT) { assignment = childAssignment; } + Resource blockedHeadroom = null; + if (childQueue instanceof LeafQueue) { + blockedHeadroom = childLimits.getHeadroom(); + } else { + blockedHeadroom = childLimits.getBlockedHeadroom(); + } Resource resourceToSubtract = Resources.max(resourceCalculator, - cluster, childLimits.getHeadroom(), Resources.none()); + cluster, blockedHeadroom, Resources.none()); + limits.addBlockedHeadroom(resourceToSubtract); if(LOG.isDebugEnabled()) { - LOG.debug("Decrease parentLimits " + parentLimits + + LOG.debug("Decrease parentLimits " + limits.getLimit() + " for " + this.getQueueName() + " by " + resourceToSubtract + " as childQueue=" + childQueue.getQueueName() + " is blocked"); } - parentLimits = Resources.subtract(parentLimits, - resourceToSubtract); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 3d028ee6b6a..f1b4444cb7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -1052,4 +1052,82 @@ public class TestContainerAllocation { rm1.close(); } + + @Test(timeout = 60000) + public void testAllocationCannotBeBlockedWhenFormerQueueReachedItsLimit() + throws Exception { + /** + * Queue structure: + *
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     *                   |  \
+     *                  c1   c2
+     *           10(max=10)  90
+     * 
+ * Test case: + * Create a cluster with two nodes whose node resource both are + * <10GB, 10core>, create queues as above, among them max-capacity of "c1" + * is 10 and others are all 100, so that max-capacity of queue "c1" is + * <2GB, 2core>, + * submit app1 to queue "c1" and launch am1(resource=<1GB, 1 core>) on nm1, + * submit app2 to queue "b" and launch am2(resource=<1GB, 1 core>) on nm1, + * app1 and app2 both ask one <2GB, 1core> containers + * + * Now queue "c" has lower capacity percentage than queue "b", the + * allocation sequence will be "a" -> "c" -> "b", queue "c1" has reached + * queue limit so that requests of app1 should be pending + * + * After nm1 do 1 heartbeat, scheduler should allocate one container for + * app2 on nm1. + */ + CapacitySchedulerConfiguration newConf = + (CapacitySchedulerConfiguration) TestUtils + .getConfigurationWithMultipleQueues(conf); + newConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".c", + new String[] { "c1", "c2" }); + newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10); + newConf + .setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10); + newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 90); + newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class, ResourceCalculator.class); + + MockRM rm1 = new MockRM(newConf); + + RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager(); + nodeLabelsManager.init(newConf); + rm1.getRMContext().setNodeLabelManager(nodeLabelsManager); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); + + // launch an app to queue "c1", AM container should be launched on nm1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // launch another app to queue "b", AM container should be launched on nm1 + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + am1.allocate("*", 2 * GB, 1, new ArrayList()); + am2.allocate("*", 2 * GB, 1, new ArrayList()); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + FiCaSchedulerApp schedulerApp1 = + cs.getApplicationAttempt(am1.getApplicationAttemptId()); + FiCaSchedulerApp schedulerApp2 = + cs.getApplicationAttempt(am2.getApplicationAttemptId()); + + // Do nm1 heartbeats 1 times, will allocate a container on nm1 for app2 + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + rm1.drainEvents(); + Assert.assertEquals(1, schedulerApp1.getLiveContainers().size()); + Assert.assertEquals(2, schedulerApp2.getLiveContainers().size()); + + rm1.close(); + } }