From c04ccf296dab6eaa53f008342e50f8729935a50d Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Tue, 30 Jul 2019 20:00:32 +0000 Subject: [PATCH] YARN-9596: QueueMetrics has incorrect metrics when labelled partitions are involved. Contributed by Muhammad Samir Khan. --- .../scheduler/capacity/CSQueueUtils.java | 61 ++++--- .../TestNodeLabelContainerAllocation.java | 165 ++++++++++++++++-- 2 files changed, 188 insertions(+), 38 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 69cc42ae78e..97398cbb26e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -230,32 +230,26 @@ class CSQueueUtils { } - private static Resource getMaxAvailableResourceToQueue( + private static Resource getMaxAvailableResourceToQueuePartition( final ResourceCalculator rc, RMNodeLabelsManager nlm, CSQueue queue, - Resource cluster) { - Set nodeLabels = queue.getNodeLabelsForQueue(); - Resource totalAvailableResource = Resources.createResource(0, 0); + Resource cluster, String partition) { + // Calculate guaranteed resource for a label in a queue by below logic. + // (total label resource) * (absolute capacity of label in that queue) + Resource queueGuranteedResource = Resources.multiply(nlm + .getResourceByLabel(partition, cluster), queue.getQueueCapacities() + .getAbsoluteCapacity(partition)); - for (String partition : nodeLabels) { - // Calculate guaranteed resource for a label in a queue by below logic. - // (total label resource) * (absolute capacity of label in that queue) - Resource queueGuranteedResource = Resources.multiply(nlm - .getResourceByLabel(partition, cluster), queue.getQueueCapacities() - .getAbsoluteCapacity(partition)); + // Available resource in queue for a specific label will be calculated as + // {(guaranteed resource for a label in a queue) - + // (resource usage of that label in the queue)} + Resource available = (Resources.greaterThan(rc, cluster, + queueGuranteedResource, + queue.getQueueResourceUsage().getUsed(partition))) ? Resources + .componentwiseMax(Resources.subtractFrom(queueGuranteedResource, + queue.getQueueResourceUsage().getUsed(partition)), Resources + .none()) : Resources.none(); - // Available resource in queue for a specific label will be calculated as - // {(guaranteed resource for a label in a queue) - - // (resource usage of that label in the queue)} - // Finally accumulate this available resource to get total. - Resource available = (Resources.greaterThan(rc, cluster, - queueGuranteedResource, - queue.getQueueResourceUsage().getUsed(partition))) ? Resources - .componentwiseMax(Resources.subtractFrom(queueGuranteedResource, - queue.getQueueResourceUsage().getUsed(partition)), Resources - .none()) : Resources.none(); - Resources.addTo(totalAvailableResource, available); - } - return totalAvailableResource; + return available; } /** @@ -285,16 +279,27 @@ class CSQueueUtils { queueResourceUsage.getNodePartitionsSet())) { updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster), partition, childQueue); + + // Update queue metrics w.r.t node labels. + // In QueueMetrics, null label is handled the same as NO_LABEL. + // This is because queue metrics for partitions are not tracked. + // In the future, will have to change this when/if queue metrics + // for partitions also get tracked. + childQueue.getMetrics().setAvailableResourcesToQueue( + partition, + getMaxAvailableResourceToQueuePartition(rc, nlm, childQueue, + cluster, partition)); } } else { updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), nodePartition, childQueue); - } - // Update queue metrics w.r.t node labels. In a generic way, we can - // calculate available resource from all labels in cluster. - childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition, - getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster)); + // Same as above. + childQueue.getMetrics().setAvailableResourcesToQueue( + nodePartition, + getMaxAvailableResourceToQueuePartition(rc, nlm, childQueue, + cluster, nodePartition)); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 9cfddd66f66..737db5b0d65 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -1988,6 +1988,15 @@ public class TestNodeLabelContainerAllocation { rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = y + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a"); + assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + LeafQueue leafQueueB = (LeafQueue) cs.getQueue("b"); + assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB()); + // app1 -> a RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); @@ -1995,7 +2004,6 @@ public class TestNodeLabelContainerAllocation { // app1 asks for 5 partition=x containers am1.allocate("*", 1 * GB, 5, new ArrayList(), "x"); // NM1 do 50 heartbeats - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); @@ -2019,17 +2027,23 @@ public class TestNodeLabelContainerAllocation { Assert.assertEquals(10 * GB, reportNm2.getAvailableResource().getMemorySize()); - LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); - assertEquals(5 * GB, leafQueue.getMetrics().getAvailableMB()); - assertEquals(0 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB()); + + // The total memory tracked by QueueMetrics is 0GB for the default partition + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); // Kill all apps in queue a cs.killAllAppsInQueue("a"); rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); rm1.waitForAppRemovedFromScheduler(app1.getApplicationId()); - assertEquals(0 * GB, leafQueue.getMetrics().getUsedAMResourceMB()); - assertEquals(0, leafQueue.getMetrics().getUsedAMResourceVCores()); + assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB()); + assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores()); rm1.close(); } @@ -2120,10 +2134,9 @@ public class TestNodeLabelContainerAllocation { reportNm2.getAvailableResource().getMemorySize()); LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); - double delta = 0.0001; // 3GB is used from label x quota. 1.5 GB is remaining from default label. // 2GB is remaining from label x. - assertEquals(6.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB()); assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB()); // app1 asks for 1 default partition container @@ -2139,10 +2152,142 @@ public class TestNodeLabelContainerAllocation { Assert.assertEquals(2, schedulerNode2.getNumContainers()); // 3GB is used from label x quota. 2GB used from default label. - // So total 2.5 GB is remaining. - assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + // So 0.5 GB is remaining from default label. + assertEquals(5 * GB / 10, leafQueue.getMetrics().getAvailableMB()); assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB()); + // The total memory tracked by QueueMetrics is 10GB + // for the default partition + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); + + rm1.close(); + } + + @Test + public void testQueueMetricsWithMixedLabels() throws Exception { + // There is only one queue which can access both default label and label x. + // There are two nodes of 10GB label x and 12GB no label. + // The test is to make sure that the queue metrics is only tracking the + // allocations and availability from default partition + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a"}); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 100); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 100); + csConf.setMaximumCapacityByLabel(queueA, "x", 100); + + // set node -> label + // label x exclusivity is set to true + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", true))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + LeafQueue leafQueueA = (LeafQueue) cs.getQueue("a"); + assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + + // app1 -> a + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // app1 asks for 5 partition=x containers + am1.allocate("*", 1 * GB, 5, new ArrayList(), "x"); + // NM1 do 50 heartbeats + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(6, schedulerNode1.getNumContainers()); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() + .getNodeReport(nm2.getNodeId()); + Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(12 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + + // app2 -> a + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a", ""); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // app2 asks for 5 partition= containers + am2.allocate("*", 1 * GB, 5, new ArrayList(), ""); + // NM2 do 50 heartbeats + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(6, schedulerNode2.getNumContainers()); + + reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId()); + Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(6 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB()); + + // The total memory tracked by QueueMetrics is 12GB + // for the default partition + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); + + // Kill all apps in queue a + cs.killAllAppsInQueue("a"); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + rm1.waitForAppRemovedFromScheduler(app1.getApplicationId()); + + assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB()); + assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores()); rm1.close(); }