From 74665e3a7d7f05644d9a5abad5a3f2d47597d6c8 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 8 Dec 2017 15:07:56 -0800 Subject: [PATCH] YARN-7274. Ability to disable elasticity at leaf queue level. (Zian Chen via wangda) Change-Id: Ic8d43e297f0f5de788b562f7eff8106c5c35e8d2 --- .../scheduler/capacity/CSQueueUtils.java | 16 +- .../TestNodeLabelContainerAllocation.java | 163 +++++++++++++++++ .../scheduler/capacity/TestQueueParsing.java | 168 +++++++++++++++++- 3 files changed, 329 insertions(+), 18 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 6daca5158a3..51e5b1791e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -66,19 +66,9 @@ class CSQueueUtils { private static void capacitiesSanityCheck(String queueName, QueueCapacities queueCapacities) { for (String label : queueCapacities.getExistingNodeLabels()) { - float capacity = queueCapacities.getCapacity(label); - float maximumCapacity = queueCapacities.getMaximumCapacity(label); - if (capacity > maximumCapacity) { - throw new IllegalArgumentException("Illegal queue capacity setting, " - + "(capacity=" + capacity + ") > (maximum-capacity=" - + maximumCapacity + "). When label=[" + label + "]"); - } - - // Actually, this may not needed since we have verified capacity <= - // maximumCapacity. And the way we compute absolute capacity (abs(x) = - // cap(x) * cap(x.parent) * ...) is a monotone increasing function. But - // just keep it here to make sure our compute abs capacity method works - // correctly. + // The only thing we should care about is absolute capacity <= + // absolute max capacity otherwise the absolute max capacity is + // no longer an absolute maximum. float absCapacity = queueCapacities.getAbsoluteCapacity(label); float absMaxCapacity = queueCapacities.getAbsoluteMaximumCapacity(label); if (absCapacity > absMaxCapacity) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 71fddfc4a33..1836919d404 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -2083,4 +2083,167 @@ public class TestNodeLabelContainerAllocation { rm1.close(); } + + @Test + public void testQueueMetricsWithLabelsDisableElasticity() throws Exception { + /** + * Test case: have a following queue structure: + * + *
+     *
+     *          root
+     *        /      \
+     *       a        b
+     *      (x)      (x)
+     *      / \
+     *     a1 a2
+     *    (x) (x)
+     * 
+ * + * a/b can access x, both of them has max-capacity-on-x = 50 + * + * When doing non-exclusive allocation, app in a (or b) can use 100% of x + * resource. + */ + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 50); + csConf.setMaximumCapacity(queueA, 100); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 50); + csConf.setMaximumCapacityByLabel(queueA, "x", 100); + final String queueB = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(queueB, 50); + csConf.setMaximumCapacity(queueB, 100); + csConf.setAccessibleNodeLabels(queueB, toSet("x")); + csConf.setCapacityByLabel(queueB, "x", 50); + csConf.setMaximumCapacityByLabel(queueB, "x", 100); + + // Define 2nd-level queues + csConf.setQueues(queueA, new String[] { "a1", + "a2"}); + + final String A1 = queueA + ".a1"; + csConf.setCapacity(A1, 20); + csConf.setMaximumCapacity(A1, 60); + csConf.setAccessibleNodeLabels(A1, toSet("x")); + csConf.setCapacityByLabel(A1, "x", 60); + csConf.setMaximumCapacityByLabel(A1, "x", 30); + + final String A2 = queueA + ".a2"; + csConf.setCapacity(A2, 80); + csConf.setMaximumCapacity(A2, 40); + csConf.setAccessibleNodeLabels(A2, toSet("x")); + csConf.setCapacityByLabel(A2, "x", 40); + csConf.setMaximumCapacityByLabel(A2, "x", 20); + + // set node -> label + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x + // app1 -> a1 + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // app1 asks for 6 partition=x containers + am1.allocate("*", 1 * GB, 6, new ArrayList(), "x"); + + // NM1 do 50 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + doNMHeartbeat(rm1, nm1.getNodeId(), 50); + checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(14 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + // Try to launch app2 in a2, asked 2GB, should success + // app2 -> a2 + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + // app2 asks for 4 partition=x containers + am2.allocate("*", 1 * GB, 4, new ArrayList(), "x"); + // NM1 do 50 heartbeats + + doNMHeartbeat(rm1, nm1.getNodeId(), 50); + checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(), + cs.getApplicationAttempt(am2.getApplicationAttemptId())); + + reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(10 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + // Kill all apps in queue a2 + cs.killAllAppsInQueue("a2"); + rm1.waitForState(app2.getApplicationId(), RMAppState.KILLED); + rm1.waitForAppRemovedFromScheduler(app2.getApplicationId()); + + // Try to launch app3 in a2, asked 6GB, should fail + // app3 -> a2 + RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "a2", "x"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); + + am3.allocate("*", 1 * GB, 6, new ArrayList(), "x"); + // NM1 do 50 heartbeats + doNMHeartbeat(rm1, nm1.getNodeId(), 50); + // app3 cannot preempt more resources restricted by disable elasticity + checkNumOfContainersInAnAppOnGivenNode(4, nm1.getNodeId(), + cs.getApplicationAttempt(am3.getApplicationAttemptId())); + + Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(10 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + // Kill all apps in queue a1 + cs.killAllAppsInQueue("a1"); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + rm1.waitForAppRemovedFromScheduler(app1.getApplicationId()); + + // app4 -> a1, try to allocate more than 6GB resource, should fail + RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x"); + MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm1); + + // app3 asks for 7 partition=x containers + am4.allocate("*", 1 * GB, 7, new ArrayList(), "x"); + // NM1 do 50 heartbeats + doNMHeartbeat(rm1, nm1.getNodeId(), 50); + + // app4 should only gets 6GB resource in partition=x + // since elasticity is disabled + checkNumOfContainersInAnAppOnGivenNode(6, nm1.getNodeId(), + cs.getApplicationAttempt(am4.getApplicationAttemptId())); + + Assert.assertEquals(10 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(10 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + rm1.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java index 5d167c7900d..add14ab2fdc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java @@ -365,7 +365,76 @@ public class TestQueueParsing { conf.setCapacityByLabel(B3, "red", 25); conf.setCapacityByLabel(B3, "blue", 25); } - + + private void setupQueueConfigurationWithLabelsAndReleaseCheck + (CapacitySchedulerConfiguration conf) { + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "red", 100); + conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "blue", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + // The cap <= max-cap check is not needed + conf.setCapacity(A, 50); + conf.setMaximumCapacity(A, 100); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + conf.setCapacity(B, 50); + conf.setMaximumCapacity(B, 100); + + LOG.info("Setup top-level queues"); + + // Define 2nd-level queues + final String A1 = A + ".a1"; + final String A2 = A + ".a2"; + conf.setQueues(A, new String[] {"a1", "a2"}); + conf.setAccessibleNodeLabels(A, ImmutableSet.of("red", "blue")); + conf.setCapacityByLabel(A, "red", 50); + conf.setMaximumCapacityByLabel(A, "red", 100); + conf.setCapacityByLabel(A, "blue", 30); + conf.setMaximumCapacityByLabel(A, "blue", 50); + + conf.setCapacity(A1, 60); + conf.setMaximumCapacity(A1, 60); + conf.setCapacityByLabel(A1, "red", 60); + conf.setMaximumCapacityByLabel(A1, "red", 30); + conf.setCapacityByLabel(A1, "blue", 100); + conf.setMaximumCapacityByLabel(A1, "blue", 100); + + conf.setCapacity(A2, 40); + conf.setMaximumCapacity(A2, 85); + conf.setAccessibleNodeLabels(A2, ImmutableSet.of("red")); + conf.setCapacityByLabel(A2, "red", 40); + conf.setMaximumCapacityByLabel(A2, "red", 60); + + final String B1 = B + ".b1"; + final String B2 = B + ".b2"; + final String B3 = B + ".b3"; + conf.setQueues(B, new String[] {"b1", "b2", "b3"}); + conf.setAccessibleNodeLabels(B, ImmutableSet.of("red", "blue")); + conf.setCapacityByLabel(B, "red", 50); + conf.setMaximumCapacityByLabel(B, "red", 100); + conf.setCapacityByLabel(B, "blue", 70); + conf.setMaximumCapacityByLabel(B, "blue", 100); + + conf.setCapacity(B1, 10); + conf.setMaximumCapacity(B1, 10); + conf.setCapacityByLabel(B1, "red", 60); + conf.setMaximumCapacityByLabel(B1, "red", 30); + conf.setCapacityByLabel(B1, "blue", 50); + conf.setMaximumCapacityByLabel(B1, "blue", 100); + + conf.setCapacity(B2, 80); + conf.setMaximumCapacity(B2, 40); + conf.setCapacityByLabel(B2, "red", 30); + conf.setCapacityByLabel(B2, "blue", 25); + + conf.setCapacity(B3, 10); + conf.setMaximumCapacity(B3, 25); + conf.setCapacityByLabel(B3, "red", 10); + conf.setCapacityByLabel(B3, "blue", 25); + } + private void setupQueueConfigurationWithLabelsInherit( CapacitySchedulerConfiguration conf) { // Define top-level queues @@ -472,7 +541,7 @@ public class TestQueueParsing { // queue-B2 inherits "red"/"blue" Assert.assertTrue(capacityScheduler.getQueue("b2") .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue"))); - + // check capacity of A2 CSQueue qA2 = capacityScheduler.getQueue("a2"); Assert.assertEquals(0.7, qA2.getCapacity(), DELTA); @@ -481,7 +550,7 @@ public class TestQueueParsing { Assert.assertEquals(0.25, qA2.getQueueCapacities().getAbsoluteCapacity("red"), DELTA); Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA); Assert.assertEquals(0.3, qA2.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA); - + // check capacity of B3 CSQueue qB3 = capacityScheduler.getQueue("b3"); Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA); @@ -489,7 +558,71 @@ public class TestQueueParsing { Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA); Assert.assertEquals(1, qB3.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA); } - + + private void checkQueueLabelsWithLeafQueueDisableElasticity + (CapacityScheduler capacityScheduler) { + // queue-A is red, blue + Assert.assertTrue(capacityScheduler.getQueue("a").getAccessibleNodeLabels() + .containsAll(ImmutableSet.of("red", "blue"))); + + // queue-A1 inherits A's configuration + Assert.assertTrue(capacityScheduler.getQueue("a1") + .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue"))); + + // queue-A2 is "red" + Assert.assertEquals(1, capacityScheduler.getQueue("a2") + .getAccessibleNodeLabels().size()); + Assert.assertTrue(capacityScheduler.getQueue("a2") + .getAccessibleNodeLabels().contains("red")); + + // queue-B is "red"/"blue" + Assert.assertTrue(capacityScheduler.getQueue("b").getAccessibleNodeLabels() + .containsAll(ImmutableSet.of("red", "blue"))); + + // queue-B2 inherits "red"/"blue" + Assert.assertTrue(capacityScheduler.getQueue("b2") + .getAccessibleNodeLabels().containsAll(ImmutableSet.of("red", "blue"))); + + // check capacity of A2 + CSQueue qA2 = capacityScheduler.getQueue("a2"); + Assert.assertEquals(0.4, qA2.getCapacity(), DELTA); + Assert.assertEquals(0.4, qA2.getQueueCapacities() + .getCapacity("red"), DELTA); + Assert.assertEquals(0.2, qA2.getAbsoluteCapacity(), DELTA); + Assert.assertEquals(0.2, qA2.getQueueCapacities() + .getAbsoluteCapacity("red"), DELTA); + Assert.assertEquals(0.85, qA2.getAbsoluteMaximumCapacity(), DELTA); + Assert.assertEquals(0.6, qA2.getQueueCapacities() + .getAbsoluteMaximumCapacity("red"), DELTA); + + // check disable elasticity at leaf queue level without label + CSQueue qB2 = capacityScheduler.getQueue("b2"); + Assert.assertEquals(0.4, qB2.getAbsoluteCapacity(), DELTA); + Assert.assertEquals(0.4, qB2.getAbsoluteMaximumCapacity(), DELTA); + + // check disable elasticity at leaf queue level with label + CSQueue qA1 = capacityScheduler.getQueue("a1"); + Assert.assertEquals(0.3, qA1.getQueueCapacities(). + getAbsoluteCapacity("red"), DELTA); + Assert.assertEquals(0.3, qA1.getQueueCapacities(). + getAbsoluteMaximumCapacity("red"), DELTA); + + CSQueue qB1 = capacityScheduler.getQueue("b1"); + Assert.assertEquals(0.3, qB1.getQueueCapacities() + .getAbsoluteCapacity("red"), DELTA); + Assert.assertEquals(0.3, qB1.getQueueCapacities() + .getAbsoluteMaximumCapacity("red"), DELTA); + + // check capacity of B3 + CSQueue qB3 = capacityScheduler.getQueue("b3"); + Assert.assertEquals(0.05, qB3.getAbsoluteCapacity(), DELTA); + Assert.assertEquals(0.175, qB3.getQueueCapacities() + .getAbsoluteCapacity("blue"), DELTA); + Assert.assertEquals(0.25, qB3.getAbsoluteMaximumCapacity(), DELTA); + Assert.assertEquals(1, qB3.getQueueCapacities() + .getAbsoluteMaximumCapacity("blue"), DELTA); + } + private void checkQueueLabelsInheritConfig(CapacityScheduler capacityScheduler) { // queue-A is red, blue @@ -514,7 +647,7 @@ public class TestQueueParsing { @Test public void testQueueParsingWithLabels() throws IOException { nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("red", "blue")); - + YarnConfiguration conf = new YarnConfiguration(); CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf); @@ -534,6 +667,31 @@ public class TestQueueParsing { checkQueueLabels(capacityScheduler); ServiceOperations.stopQuietly(capacityScheduler); } + + @Test + public void testQueueParsingWithLeafQueueDisableElasticity() + throws IOException { + nodeLabelManager.addToCluserNodeLabelsWithDefaultExclusivity + (ImmutableSet.of("red", "blue")); + + YarnConfiguration conf = new YarnConfiguration(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(conf); + setupQueueConfigurationWithLabelsAndReleaseCheck(csConf); + CapacityScheduler capacityScheduler = new CapacityScheduler(); + RMContextImpl rmContext = + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), + new ClientToAMTokenSecretManagerInRM(), null); + rmContext.setNodeLabelManager(nodeLabelManager); + capacityScheduler.setConf(csConf); + capacityScheduler.setRMContext(rmContext); + capacityScheduler.init(csConf); + capacityScheduler.start(); + checkQueueLabelsWithLeafQueueDisableElasticity(capacityScheduler); + ServiceOperations.stopQuietly(capacityScheduler); + } @Test public void testQueueParsingWithLabelsInherit() throws IOException {