From e34e1aa4fe0a0826439227175fc3321f840dddd4 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Fri, 15 Jul 2016 11:40:12 -0700 Subject: [PATCH] YARN-4484. Available Resource calculation for a queue is not correct when used with labels. (Sunil G via wangda) (cherry picked from commit 24db9167f16ba643a186624b33a6b9b80020f476) --- .../scheduler/capacity/CSQueueUtils.java | 55 ++--- .../TestNodeLabelContainerAllocation.java | 212 ++++++++++++++++++ 2 files changed, 242 insertions(+), 25 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 0166d833eea..d5cdb3221f0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -227,24 +227,34 @@ class CSQueueUtils { .setAbsoluteReservedCapacity(nodePartition, absoluteReservedCapacity); } - private static Resource getNonPartitionedMaxAvailableResourceToQueue( - final ResourceCalculator rc, Resource totalNonPartitionedResource, - CSQueue queue) { - Resource queueLimit = Resources.none(); - Resource usedResources = queue.getUsedResources(); + private static Resource getMaxAvailableResourceToQueue( + final ResourceCalculator rc, RMNodeLabelsManager nlm, CSQueue queue, + Resource cluster) { + Set nodeLabels = queue.getNodeLabelsForQueue(); + Resource totalAvailableResource = Resources.createResource(0, 0); - if (Resources.greaterThan(rc, totalNonPartitionedResource, - totalNonPartitionedResource, Resources.none())) { - queueLimit = - Resources.multiply(totalNonPartitionedResource, - queue.getAbsoluteCapacity()); + for (String partition : nodeLabels) { + // Calculate guaranteed resource for a label in a queue by below logic. + // (total label resource) * (absolute capacity of label in that queue) + Resource queueGuranteedResource = Resources.multiply(nlm + .getResourceByLabel(partition, cluster), queue.getQueueCapacities() + .getAbsoluteCapacity(partition)); + + // Available resource in queue for a specific label will be calculated as + // {(guaranteed resource for a label in a queue) - + // (resource usage of that label in the queue)} + // Finally accumulate this available resource to get total. + Resource available = (Resources.greaterThan(rc, cluster, + queueGuranteedResource, + queue.getQueueResourceUsage().getUsed(partition))) ? Resources + .componentwiseMax(Resources.subtractFrom(queueGuranteedResource, + queue.getQueueResourceUsage().getUsed(partition)), Resources + .none()) : Resources.none(); + Resources.addTo(totalAvailableResource, available); } - - Resource available = Resources.subtract(queueLimit, usedResources); - return Resources.max(rc, totalNonPartitionedResource, available, - Resources.none()); + return totalAvailableResource; } - + /** *

* Update Queue Statistics: @@ -277,15 +287,10 @@ class CSQueueUtils { updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), minimumAllocation, queueResourceUsage, queueCapacities, nodePartition); } - - // Now in QueueMetrics, we only store available-resource-to-queue for - // default partition. - if (nodePartition == null - || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { - childQueue.getMetrics().setAvailableResourcesToQueue( - getNonPartitionedMaxAvailableResourceToQueue(rc, - nlm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, cluster), - childQueue)); - } + + // Update queue metrics w.r.t node labels. In a generic way, we can + // calculate available resource from all labels in cluster. + childQueue.getMetrics().setAvailableResourcesToQueue( + getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index cff1514b8c8..47fd5348731 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.junit.Assert.assertEquals; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -47,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -1863,4 +1866,213 @@ public class TestNodeLabelContainerAllocation { checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(), cs.getApplicationAttempt(am2.getApplicationAttemptId())); } + + @Test + public void testQueueMetricsWithLabels() throws Exception { + /** + * Test case: have a following queue structure: + * + *

+     *            root
+     *         /      \
+     *        a        b
+     *        (x)     (x)
+     * 
+ * + * a/b can access x, both of them has max-capacity-on-x = 50 + * + * When doing non-exclusive allocation, app in a (or b) can use 100% of x + * resource. + */ + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 25); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 50); + csConf.setMaximumCapacityByLabel(queueA, "x", 50); + final String queueB = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(queueB, 75); + csConf.setAccessibleNodeLabels(queueB, toSet("x")); + csConf.setCapacityByLabel(queueB, "x", 50); + csConf.setMaximumCapacityByLabel(queueB, "x", 50); + + // set node -> label + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", false))); + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("y", false))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h2", 0), toSet("y"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y + // app1 -> a + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // app1 asks for 5 partition=x containers + am1.allocate("*", 1 * GB, 5, new ArrayList(), "x"); + // NM1 do 50 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(5, schedulerNode1.getNumContainers()); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(5 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(5 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() + .getNodeReport(nm2.getNodeId()); + Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(10 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); + assertEquals(0 * GB, leafQueue.getMetrics().getAvailableMB()); + assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); + rm1.close(); + } + + @Test + public void testQueueMetricsWithLabelsOnDefaultLabelNode() throws Exception { + /** + * Test case: have a following queue structure: + * + *
+     *            root
+     *         /      \
+     *        a        b
+     *        (x)     (x)
+     * 
+ * + * a/b can access x, both of them has max-capacity-on-x = 50 + * + * When doing non-exclusive allocation, app in a (or b) can use 100% of x + * resource. + */ + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 25); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 50); + csConf.setMaximumCapacityByLabel(queueA, "x", 50); + final String queueB = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(queueB, 75); + csConf.setAccessibleNodeLabels(queueB, toSet("x")); + csConf.setCapacityByLabel(queueB, "x", 50); + csConf.setMaximumCapacityByLabel(queueB, "x", 50); + + // set node -> label + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = + // app1 -> a + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); + + // app1 asks for 3 partition= containers + am1.allocate("*", 1 * GB, 3, new ArrayList()); + + // NM1 do 50 heartbeats + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // app1 gets all resource in partition=x (non-exclusive) + Assert.assertEquals(3, schedulerNode1.getNumContainers()); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(7 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() + .getNodeReport(nm2.getNodeId()); + Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(9 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); + double delta = 0.0001; + // 3GB is used from label x quota. 1.5 GB is remaining from default label. + // 2GB is remaining from label x. + assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB()); + + // app1 asks for 1 default partition container + am1.allocate("*", 1 * GB, 5, new ArrayList()); + + // NM2 do couple of heartbeats + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + + // app1 gets all resource in default partition + Assert.assertEquals(2, schedulerNode2.getNumContainers()); + + // 3GB is used from label x quota. 2GB used from default label. + // So total 2.5 GB is remaining. + assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); + + rm1.close(); + } }