From b5235f1ed05f6506aa2c40825b2b647475a4d91a Mon Sep 17 00:00:00 2001 From: Jonathan Hung Date: Sun, 8 Dec 2019 15:52:17 -0800 Subject: [PATCH] YARN-10012. Guaranteed and max capacity queue metrics for custom resources (cherry picked from commit 92bce918dc5d03560169642b71636800680a3292) --- .../CSQueueMetricsForCustomResources.java | 50 +++++ .../scheduler/QueueMetrics.java | 52 +++-- .../scheduler/capacity/CSQueueMetrics.java | 53 ++++++ .../server/resourcemanager/MockNodes.java | 7 + .../TestQueueMetricsForCustomResources.java | 4 +- .../capacity/TestCapacityScheduler.java | 178 ++++++++++++++++-- 6 files changed, 306 insertions(+), 38 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/CSQueueMetricsForCustomResources.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/CSQueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/CSQueueMetricsForCustomResources.java new file mode 100644 index 00000000000..77fe42bb402 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/CSQueueMetricsForCustomResources.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; + +import java.util.Map; + +/** + * This class is a main entry-point for any kind of CSQueueMetrics for + * custom resources. + * It provides increase and decrease methods for all types of metrics. + */ +public class CSQueueMetricsForCustomResources + extends QueueMetricsForCustomResources { + private final QueueMetricsCustomResource guaranteedCapacity = + new QueueMetricsCustomResource(); + private final QueueMetricsCustomResource maxCapacity = + new QueueMetricsCustomResource(); + + public void setGuaranteedCapacity(Resource res) { + guaranteedCapacity.set(res); + } + + public void setMaxCapacity(Resource res) { + maxCapacity.set(res); + } + + public Map getGuaranteedCapacity() { + return guaranteedCapacity.getValues(); + } + + public Map getMaxCapacity() { + return maxCapacity.getValues(); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 094444f6b20..b05a0ae1f49 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -161,11 +161,7 @@ public class QueueMetrics implements MetricsSource { this.conf = conf; runningTime = buildBuckets(conf); - if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { - this.queueMetricsForCustomResources = - new QueueMetricsForCustomResources(); - registerCustomResources(); - } + createQueueMetricsForCustomResources(); } protected QueueMetrics tag(MetricsInfo info, String value) { @@ -451,25 +447,33 @@ public class QueueMetrics implements MetricsSource { } } + protected Map initAndGetCustomResources() { + Map customResources = new HashMap(); + ResourceInformation[] resources = ResourceUtils.getResourceTypesArray(); + + for (int i = 2; i < resources.length; i++) { + ResourceInformation resource = resources[i]; + customResources.put(resource.getName(), Long.valueOf(0)); + } + return customResources; + } + + protected void createQueueMetricsForCustomResources() { + if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { + this.queueMetricsForCustomResources = + new QueueMetricsForCustomResources(); + registerCustomResources(); + } + } + /** * Register all custom resources metrics as part of initialization. As and * when this metric object construction happens for any queue, all custom * resource metrics value would be initialized with '0' like any other * mandatory resources metrics */ - private void registerCustomResources() { - Map customResources = - new HashMap(); - ResourceInformation[] resources = - ResourceUtils.getResourceTypesArray(); - - for (int i = - 2; i < resources.length; i++) { - ResourceInformation resource = - resources[i]; - customResources.put(resource.getName(), Long.valueOf(0)); - } - + protected void registerCustomResources() { + Map customResources = initAndGetCustomResources(); registerCustomResources(customResources, ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); registerCustomResources(customResources, AVAILABLE_RESOURCE_METRIC_PREFIX, @@ -483,7 +487,7 @@ public class QueueMetrics implements MetricsSource { AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC); } - private void registerCustomResources(Map customResources, + protected void registerCustomResources(Map customResources, String metricPrefix, String metricDesc) { for (Entry entry : customResources.entrySet()) { String resourceName = entry.getKey(); @@ -966,4 +970,14 @@ public class QueueMetrics implements MetricsSource { public long getAggregateVcoresPreempted() { return aggregateVcoresPreempted.value(); } + + @VisibleForTesting + public QueueMetricsForCustomResources getQueueMetricsForCustomResources() { + return this.queueMetricsForCustomResources; + } + + public void setQueueMetricsForCustomResources( + QueueMetricsForCustomResources metrics) { + this.queueMetricsForCustomResources = metrics; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index c50a1b145d3..e9a0aafe6ee 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; @@ -28,8 +30,10 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.CSQueueMetricsForCustomResources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; @Metrics(context = "yarn") public class CSQueueMetrics extends QueueMetrics { @@ -64,11 +68,36 @@ public class CSQueueMetrics extends QueueMetrics { @Metric("Maximum capacity in percentage relative to total partition") private MutableGaugeFloat maxAbsoluteCapacity; + private static final String GUARANTEED_CAPACITY_METRIC_PREFIX = + "GuaranteedCapacity."; + private static final String GUARANTEED_CAPACITY_METRIC_DESC = + "GuaranteedCapacity of NAME"; + + private static final String MAX_CAPACITY_METRIC_PREFIX = + "MaxCapacity."; + private static final String MAX_CAPACITY_METRIC_DESC = + "MaxCapacity of NAME"; + CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { super(ms, queueName, parent, enableUserMetrics, conf); } + /** + * Register all custom resources metrics as part of initialization. As and + * when this metric object construction happens for any queue, all custom + * resource metrics value would be initialized with '0' like any other + * mandatory resources metrics + */ + protected void registerCustomResources() { + Map customResources = initAndGetCustomResources(); + registerCustomResources(customResources, GUARANTEED_CAPACITY_METRIC_PREFIX, + GUARANTEED_CAPACITY_METRIC_DESC); + registerCustomResources(customResources, MAX_CAPACITY_METRIC_PREFIX, + MAX_CAPACITY_METRIC_DESC); + super.registerCustomResources(); + } + public long getAMResourceLimitMB() { return AMResourceLimitMB.value(); } @@ -155,6 +184,14 @@ public class CSQueueMetrics extends QueueMetrics { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { guaranteedMB.set(res.getMemorySize()); guaranteedVCores.set(res.getVirtualCores()); + if (getQueueMetricsForCustomResources() != null) { + ((CSQueueMetricsForCustomResources) getQueueMetricsForCustomResources()) + .setGuaranteedCapacity(res); + registerCustomResources( + ((CSQueueMetricsForCustomResources) + getQueueMetricsForCustomResources()).getGuaranteedCapacity(), + GUARANTEED_CAPACITY_METRIC_PREFIX, GUARANTEED_CAPACITY_METRIC_DESC); + } } } @@ -170,6 +207,22 @@ public class CSQueueMetrics extends QueueMetrics { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { maxCapacityMB.set(res.getMemorySize()); maxCapacityVCores.set(res.getVirtualCores()); + if (getQueueMetricsForCustomResources() != null) { + ((CSQueueMetricsForCustomResources) getQueueMetricsForCustomResources()) + .setMaxCapacity(res); + registerCustomResources( + ((CSQueueMetricsForCustomResources) + getQueueMetricsForCustomResources()).getMaxCapacity(), + MAX_CAPACITY_METRIC_PREFIX, MAX_CAPACITY_METRIC_DESC); + } + } + } + + @Override + protected void createQueueMetricsForCustomResources() { + if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { + setQueueMetricsForCustomResources(new CSQueueMetricsForCustomResources()); + registerCustomResources(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index a8719932f3f..e8d352ffa9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -36,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -92,6 +93,12 @@ public class MockNodes { return rs; } + public static Resource newResource(long memory, int vCores, + Map customResources) { + return ResourceTypesTestHelper.newResource(memory, vCores, customResources); + } + + public static Resource newUsedResource(Resource total) { Resource rs = recordFactory.newRecordInstance(Resource.class); rs.setMemorySize((int)(Math.random() * total.getMemorySize())); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java index ed0ae6e3305..b6441f38332 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java @@ -95,8 +95,8 @@ public class TestQueueMetricsForCustomResources { public static final long GB = 1024; // MB private static final Configuration CONF = new Configuration(); - private static final String CUSTOM_RES_1 = "custom_res_1"; - private static final String CUSTOM_RES_2 = "custom_res_2"; + public static final String CUSTOM_RES_1 = "custom_res_1"; + public static final String CUSTOM_RES_2 = "custom_res_2"; public static final String USER = "alice"; private Resource defaultResource; private MetricsSystem ms; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 73f6fc15566..7ef8b9eae4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -79,6 +79,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; @@ -108,7 +109,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMW import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; - +import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -127,6 +128,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.CSQueueMetricsForCustomResources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; @@ -134,6 +136,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState; @@ -190,6 +193,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { @Before public void setUp() throws Exception { + ResourceUtils.resetResourceTypes(new Configuration()); resourceManager = new ResourceManager() { @Override protected RMNodeLabelsManager createNodeLabelManager() { @@ -5178,23 +5182,109 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { @Test public void testCSQueueMetrics() throws Exception { - CapacityScheduler cs = new CapacityScheduler(); - cs.setConf(new YarnConfiguration()); - cs.setRMContext(resourceManager.getRMContext()); - CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); - setupQueueConfiguration(conf); - cs.init(conf); - cs.start(); - RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 1, "n1"); - RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 2, "n2"); + // Initialize resource map + Map riMap = new HashMap<>(); + + // Initialize mandatory resources + ResourceInformation memory = + ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = + ResourceInformation.newInstance(ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + riMap.put(TestQueueMetricsForCustomResources.CUSTOM_RES_1, + ResourceInformation.newInstance( + TestQueueMetricsForCustomResources.CUSTOM_RES_1, "", 1, 10)); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setResourceComparator(DominantResourceCalculator.class); + + csConf.set(YarnConfiguration.RESOURCE_TYPES, + TestQueueMetricsForCustomResources.CUSTOM_RES_1); + + setupQueueConfiguration(csConf); + + YarnConfiguration conf = new YarnConfiguration(csConf); + + // Don't reset resource types since we have already configured resource + // types + conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + RMNode n1 = MockNodes.newNodeInfo(0, + MockNodes.newResource(50 * GB, 50, + ImmutableMap. builder() + .put(TestQueueMetricsForCustomResources.CUSTOM_RES_1, + String.valueOf(1000)) + .build()), + 1, "n1"); + RMNode n2 = MockNodes.newNodeInfo(0, + MockNodes.newResource(50 * GB, 50, + ImmutableMap. builder() + .put(TestQueueMetricsForCustomResources.CUSTOM_RES_1, + String.valueOf(2000)) + .build()), + 2, "n2"); cs.handle(new NodeAddedSchedulerEvent(n1)); cs.handle(new NodeAddedSchedulerEvent(n2)); + Map guaranteedCapA11 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1") + .getMetrics()).getQueueMetricsForCustomResources()) + .getGuaranteedCapacity(); + assertEquals(94, guaranteedCapA11 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + Map maxCapA11 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1") + .getMetrics()).getQueueMetricsForCustomResources()) + .getMaxCapacity(); + assertEquals(3000, maxCapA11 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + assertEquals(10240, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB()); assertEquals(71680, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB()); assertEquals(102400, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB()); assertEquals(102400, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB()); + Map guaranteedCapA = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a") + .getMetrics()).getQueueMetricsForCustomResources()) + .getGuaranteedCapacity(); + assertEquals(314, guaranteedCapA + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + Map maxCapA = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a") + .getMetrics()).getQueueMetricsForCustomResources()) + .getMaxCapacity(); + assertEquals(3000, maxCapA + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + Map guaranteedCapB1 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1") + .getMetrics()).getQueueMetricsForCustomResources()) + .getGuaranteedCapacity(); + assertEquals(2126, guaranteedCapB1 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + Map maxCapB1 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1") + .getMetrics()).getQueueMetricsForCustomResources()) + .getMaxCapacity(); + assertEquals(3000, maxCapB1 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); // Remove a node, metrics should be updated cs.handle(new NodeRemovedSchedulerEvent(n2)); @@ -5202,6 +5292,31 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { assertEquals(35840, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB()); assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB()); assertEquals(51200, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB()); + Map guaranteedCapA1 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a") + .getMetrics()).getQueueMetricsForCustomResources()) + .getGuaranteedCapacity(); + + assertEquals(104, guaranteedCapA1 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + Map maxCapA1 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a") + .getMetrics()).getQueueMetricsForCustomResources()) + .getMaxCapacity(); + assertEquals(1000, maxCapA1 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + Map guaranteedCapB11 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1") + .getMetrics()).getQueueMetricsForCustomResources()) + .getGuaranteedCapacity(); + assertEquals(708, guaranteedCapB11 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + Map maxCapB11 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1") + .getMetrics()).getQueueMetricsForCustomResources()) + .getMaxCapacity(); + assertEquals(1000, maxCapB11 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a") .getMetrics()).getGuaranteedCapacity(), DELTA); assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a") @@ -5220,20 +5335,49 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { .getMaxAbsoluteCapacity(), DELTA); // Add child queue to a, and reinitialize. Metrics should be updated - conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a", new String[] {"a1", "a2", "a3"} ); - conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a2", 30.0f); - conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 40.0f); - conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 50.0f); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a", + new String[] {"a1", "a2", "a3"}); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a2", 29.5f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 40.5f); + csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", + 50.0f); - cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), + cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null, + null, new RMContainerTokenSecretManager(csConf), + new NMTokenSecretManagerInRM(csConf), new ClientToAMTokenSecretManagerInRM(), null)); assertEquals(1024, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getGuaranteedMB()); assertEquals(2048, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getGuaranteedMB()); assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB()); assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB()); + + Map guaranteedCapA2 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2") + .getMetrics()).getQueueMetricsForCustomResources()) + .getGuaranteedCapacity(); + assertEquals(30, guaranteedCapA2 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + Map maxCapA2 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2") + .getMetrics()).getQueueMetricsForCustomResources()) + .getMaxCapacity(); + assertEquals(1000, maxCapA2 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + + Map guaranteedCapA3 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3") + .getMetrics()).getQueueMetricsForCustomResources()) + .getGuaranteedCapacity(); + assertEquals(42, guaranteedCapA3 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + Map maxCapA3 = + ((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3") + .getMetrics()).getQueueMetricsForCustomResources()) + .getMaxCapacity(); + assertEquals(500, maxCapA3 + .get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue()); + rm.stop(); } @Test