YARN-10012. Guaranteed and max capacity queue metrics for custom resources. Contributed by Manikandan R
(cherry picked from commit92bce918dc
) (cherry picked from commit9228e3f0ad
)
This commit is contained in:
parent
67cf1f94cd
commit
9b4368a62f
|
@ -0,0 +1,50 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is a main entry-point for any kind of CSQueueMetrics for
|
||||||
|
* custom resources.
|
||||||
|
* It provides increase and decrease methods for all types of metrics.
|
||||||
|
*/
|
||||||
|
public class CSQueueMetricsForCustomResources
|
||||||
|
extends QueueMetricsForCustomResources {
|
||||||
|
private final QueueMetricsCustomResource guaranteedCapacity =
|
||||||
|
new QueueMetricsCustomResource();
|
||||||
|
private final QueueMetricsCustomResource maxCapacity =
|
||||||
|
new QueueMetricsCustomResource();
|
||||||
|
|
||||||
|
public void setGuaranteedCapacity(Resource res) {
|
||||||
|
guaranteedCapacity.set(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxCapacity(Resource res) {
|
||||||
|
maxCapacity.set(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Long> getGuaranteedCapacity() {
|
||||||
|
return guaranteedCapacity.getValues();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Long> getMaxCapacity() {
|
||||||
|
return maxCapacity.getValues();
|
||||||
|
}
|
||||||
|
}
|
|
@ -161,11 +161,7 @@ public class QueueMetrics implements MetricsSource {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
runningTime = buildBuckets(conf);
|
runningTime = buildBuckets(conf);
|
||||||
|
|
||||||
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
createQueueMetricsForCustomResources();
|
||||||
this.queueMetricsForCustomResources =
|
|
||||||
new QueueMetricsForCustomResources();
|
|
||||||
registerCustomResources();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected QueueMetrics tag(MetricsInfo info, String value) {
|
protected QueueMetrics tag(MetricsInfo info, String value) {
|
||||||
|
@ -451,25 +447,33 @@ public class QueueMetrics implements MetricsSource {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected Map<String, Long> initAndGetCustomResources() {
|
||||||
|
Map<String, Long> customResources = new HashMap<String, Long>();
|
||||||
|
ResourceInformation[] resources = ResourceUtils.getResourceTypesArray();
|
||||||
|
|
||||||
|
for (int i = 2; i < resources.length; i++) {
|
||||||
|
ResourceInformation resource = resources[i];
|
||||||
|
customResources.put(resource.getName(), Long.valueOf(0));
|
||||||
|
}
|
||||||
|
return customResources;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void createQueueMetricsForCustomResources() {
|
||||||
|
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
||||||
|
this.queueMetricsForCustomResources =
|
||||||
|
new QueueMetricsForCustomResources();
|
||||||
|
registerCustomResources();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register all custom resources metrics as part of initialization. As and
|
* Register all custom resources metrics as part of initialization. As and
|
||||||
* when this metric object construction happens for any queue, all custom
|
* when this metric object construction happens for any queue, all custom
|
||||||
* resource metrics value would be initialized with '0' like any other
|
* resource metrics value would be initialized with '0' like any other
|
||||||
* mandatory resources metrics
|
* mandatory resources metrics
|
||||||
*/
|
*/
|
||||||
private void registerCustomResources() {
|
protected void registerCustomResources() {
|
||||||
Map<String, Long> customResources =
|
Map<String, Long> customResources = initAndGetCustomResources();
|
||||||
new HashMap<String, Long>();
|
|
||||||
ResourceInformation[] resources =
|
|
||||||
ResourceUtils.getResourceTypesArray();
|
|
||||||
|
|
||||||
for (int i =
|
|
||||||
2; i < resources.length; i++) {
|
|
||||||
ResourceInformation resource =
|
|
||||||
resources[i];
|
|
||||||
customResources.put(resource.getName(), Long.valueOf(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
registerCustomResources(customResources, ALLOCATED_RESOURCE_METRIC_PREFIX,
|
registerCustomResources(customResources, ALLOCATED_RESOURCE_METRIC_PREFIX,
|
||||||
ALLOCATED_RESOURCE_METRIC_DESC);
|
ALLOCATED_RESOURCE_METRIC_DESC);
|
||||||
registerCustomResources(customResources, AVAILABLE_RESOURCE_METRIC_PREFIX,
|
registerCustomResources(customResources, AVAILABLE_RESOURCE_METRIC_PREFIX,
|
||||||
|
@ -483,7 +487,7 @@ public class QueueMetrics implements MetricsSource {
|
||||||
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
|
AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerCustomResources(Map<String, Long> customResources,
|
protected void registerCustomResources(Map<String, Long> customResources,
|
||||||
String metricPrefix, String metricDesc) {
|
String metricPrefix, String metricDesc) {
|
||||||
for (Entry<String, Long> entry : customResources.entrySet()) {
|
for (Entry<String, Long> entry : customResources.entrySet()) {
|
||||||
String resourceName = entry.getKey();
|
String resourceName = entry.getKey();
|
||||||
|
@ -966,4 +970,14 @@ public class QueueMetrics implements MetricsSource {
|
||||||
public long getAggregateVcoresPreempted() {
|
public long getAggregateVcoresPreempted() {
|
||||||
return aggregateVcoresPreempted.value();
|
return aggregateVcoresPreempted.value();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public QueueMetricsForCustomResources getQueueMetricsForCustomResources() {
|
||||||
|
return this.queueMetricsForCustomResources;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setQueueMetricsForCustomResources(
|
||||||
|
QueueMetricsForCustomResources metrics) {
|
||||||
|
this.queueMetricsForCustomResources = metrics;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
|
@ -28,8 +30,10 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.CSQueueMetricsForCustomResources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
|
|
||||||
@Metrics(context = "yarn")
|
@Metrics(context = "yarn")
|
||||||
public class CSQueueMetrics extends QueueMetrics {
|
public class CSQueueMetrics extends QueueMetrics {
|
||||||
|
@ -64,11 +68,36 @@ public class CSQueueMetrics extends QueueMetrics {
|
||||||
@Metric("Maximum capacity in percentage relative to total partition")
|
@Metric("Maximum capacity in percentage relative to total partition")
|
||||||
private MutableGaugeFloat maxAbsoluteCapacity;
|
private MutableGaugeFloat maxAbsoluteCapacity;
|
||||||
|
|
||||||
|
private static final String GUARANTEED_CAPACITY_METRIC_PREFIX =
|
||||||
|
"GuaranteedCapacity.";
|
||||||
|
private static final String GUARANTEED_CAPACITY_METRIC_DESC =
|
||||||
|
"GuaranteedCapacity of NAME";
|
||||||
|
|
||||||
|
private static final String MAX_CAPACITY_METRIC_PREFIX =
|
||||||
|
"MaxCapacity.";
|
||||||
|
private static final String MAX_CAPACITY_METRIC_DESC =
|
||||||
|
"MaxCapacity of NAME";
|
||||||
|
|
||||||
CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
||||||
boolean enableUserMetrics, Configuration conf) {
|
boolean enableUserMetrics, Configuration conf) {
|
||||||
super(ms, queueName, parent, enableUserMetrics, conf);
|
super(ms, queueName, parent, enableUserMetrics, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Register all custom resources metrics as part of initialization. As and
|
||||||
|
* when this metric object construction happens for any queue, all custom
|
||||||
|
* resource metrics value would be initialized with '0' like any other
|
||||||
|
* mandatory resources metrics
|
||||||
|
*/
|
||||||
|
protected void registerCustomResources() {
|
||||||
|
Map<String, Long> customResources = initAndGetCustomResources();
|
||||||
|
registerCustomResources(customResources, GUARANTEED_CAPACITY_METRIC_PREFIX,
|
||||||
|
GUARANTEED_CAPACITY_METRIC_DESC);
|
||||||
|
registerCustomResources(customResources, MAX_CAPACITY_METRIC_PREFIX,
|
||||||
|
MAX_CAPACITY_METRIC_DESC);
|
||||||
|
super.registerCustomResources();
|
||||||
|
}
|
||||||
|
|
||||||
public long getAMResourceLimitMB() {
|
public long getAMResourceLimitMB() {
|
||||||
return AMResourceLimitMB.value();
|
return AMResourceLimitMB.value();
|
||||||
}
|
}
|
||||||
|
@ -155,6 +184,14 @@ public class CSQueueMetrics extends QueueMetrics {
|
||||||
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||||
guaranteedMB.set(res.getMemorySize());
|
guaranteedMB.set(res.getMemorySize());
|
||||||
guaranteedVCores.set(res.getVirtualCores());
|
guaranteedVCores.set(res.getVirtualCores());
|
||||||
|
if (getQueueMetricsForCustomResources() != null) {
|
||||||
|
((CSQueueMetricsForCustomResources) getQueueMetricsForCustomResources())
|
||||||
|
.setGuaranteedCapacity(res);
|
||||||
|
registerCustomResources(
|
||||||
|
((CSQueueMetricsForCustomResources)
|
||||||
|
getQueueMetricsForCustomResources()).getGuaranteedCapacity(),
|
||||||
|
GUARANTEED_CAPACITY_METRIC_PREFIX, GUARANTEED_CAPACITY_METRIC_DESC);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -170,6 +207,22 @@ public class CSQueueMetrics extends QueueMetrics {
|
||||||
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||||
maxCapacityMB.set(res.getMemorySize());
|
maxCapacityMB.set(res.getMemorySize());
|
||||||
maxCapacityVCores.set(res.getVirtualCores());
|
maxCapacityVCores.set(res.getVirtualCores());
|
||||||
|
if (getQueueMetricsForCustomResources() != null) {
|
||||||
|
((CSQueueMetricsForCustomResources) getQueueMetricsForCustomResources())
|
||||||
|
.setMaxCapacity(res);
|
||||||
|
registerCustomResources(
|
||||||
|
((CSQueueMetricsForCustomResources)
|
||||||
|
getQueueMetricsForCustomResources()).getMaxCapacity(),
|
||||||
|
MAX_CAPACITY_METRIC_PREFIX, MAX_CAPACITY_METRIC_DESC);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createQueueMetricsForCustomResources() {
|
||||||
|
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
||||||
|
setQueueMetricsForCustomResources(new CSQueueMetricsForCustomResources());
|
||||||
|
registerCustomResources();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||||
|
import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
@ -91,6 +92,12 @@ public class MockNodes {
|
||||||
return rs;
|
return rs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Resource newResource(long memory, int vCores,
|
||||||
|
Map<String, String> customResources) {
|
||||||
|
return ResourceTypesTestHelper.newResource(memory, vCores, customResources);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static Resource newUsedResource(Resource total) {
|
public static Resource newUsedResource(Resource total) {
|
||||||
Resource rs = recordFactory.newRecordInstance(Resource.class);
|
Resource rs = recordFactory.newRecordInstance(Resource.class);
|
||||||
rs.setMemorySize((int)(Math.random() * total.getMemorySize()));
|
rs.setMemorySize((int)(Math.random() * total.getMemorySize()));
|
||||||
|
|
|
@ -95,8 +95,8 @@ public class TestQueueMetricsForCustomResources {
|
||||||
|
|
||||||
public static final long GB = 1024; // MB
|
public static final long GB = 1024; // MB
|
||||||
private static final Configuration CONF = new Configuration();
|
private static final Configuration CONF = new Configuration();
|
||||||
private static final String CUSTOM_RES_1 = "custom_res_1";
|
public static final String CUSTOM_RES_1 = "custom_res_1";
|
||||||
private static final String CUSTOM_RES_2 = "custom_res_2";
|
public static final String CUSTOM_RES_2 = "custom_res_2";
|
||||||
public static final String USER = "alice";
|
public static final String USER = "alice";
|
||||||
private Resource defaultResource;
|
private Resource defaultResource;
|
||||||
private MetricsSystem ms;
|
private MetricsSystem ms;
|
||||||
|
|
|
@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||||
|
@ -109,6 +110,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
||||||
.UserGroupMappingPlacementRule;
|
.UserGroupMappingPlacementRule;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
||||||
|
@ -127,6 +129,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.CSQueueMetricsForCustomResources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
@ -134,6 +137,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState;
|
||||||
|
@ -190,6 +194,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
|
ResourceUtils.resetResourceTypes(new Configuration());
|
||||||
resourceManager = new ResourceManager() {
|
resourceManager = new ResourceManager() {
|
||||||
@Override
|
@Override
|
||||||
protected RMNodeLabelsManager createNodeLabelManager() {
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
||||||
|
@ -5155,23 +5160,109 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCSQueueMetrics() throws Exception {
|
public void testCSQueueMetrics() throws Exception {
|
||||||
CapacityScheduler cs = new CapacityScheduler();
|
|
||||||
cs.setConf(new YarnConfiguration());
|
|
||||||
cs.setRMContext(resourceManager.getRMContext());
|
|
||||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
|
||||||
setupQueueConfiguration(conf);
|
|
||||||
cs.init(conf);
|
|
||||||
cs.start();
|
|
||||||
|
|
||||||
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 1, "n1");
|
// Initialize resource map
|
||||||
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 2, "n2");
|
Map<String, ResourceInformation> riMap = new HashMap<>();
|
||||||
|
|
||||||
|
// Initialize mandatory resources
|
||||||
|
ResourceInformation memory =
|
||||||
|
ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(),
|
||||||
|
ResourceInformation.MEMORY_MB.getUnits(),
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
|
||||||
|
ResourceInformation vcores =
|
||||||
|
ResourceInformation.newInstance(ResourceInformation.VCORES.getName(),
|
||||||
|
ResourceInformation.VCORES.getUnits(),
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
|
||||||
|
riMap.put(ResourceInformation.MEMORY_URI, memory);
|
||||||
|
riMap.put(ResourceInformation.VCORES_URI, vcores);
|
||||||
|
riMap.put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
|
||||||
|
ResourceInformation.newInstance(
|
||||||
|
TestQueueMetricsForCustomResources.CUSTOM_RES_1, "", 1, 10));
|
||||||
|
|
||||||
|
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
csConf.setResourceComparator(DominantResourceCalculator.class);
|
||||||
|
|
||||||
|
csConf.set(YarnConfiguration.RESOURCE_TYPES,
|
||||||
|
TestQueueMetricsForCustomResources.CUSTOM_RES_1);
|
||||||
|
|
||||||
|
setupQueueConfiguration(csConf);
|
||||||
|
|
||||||
|
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||||
|
|
||||||
|
// Don't reset resource types since we have already configured resource
|
||||||
|
// types
|
||||||
|
conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false);
|
||||||
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
|
||||||
|
MockRM rm = new MockRM(conf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
|
||||||
|
RMNode n1 = MockNodes.newNodeInfo(0,
|
||||||
|
MockNodes.newResource(50 * GB, 50,
|
||||||
|
ImmutableMap.<String, String> builder()
|
||||||
|
.put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
|
||||||
|
String.valueOf(1000))
|
||||||
|
.build()),
|
||||||
|
1, "n1");
|
||||||
|
RMNode n2 = MockNodes.newNodeInfo(0,
|
||||||
|
MockNodes.newResource(50 * GB, 50,
|
||||||
|
ImmutableMap.<String, String> builder()
|
||||||
|
.put(TestQueueMetricsForCustomResources.CUSTOM_RES_1,
|
||||||
|
String.valueOf(2000))
|
||||||
|
.build()),
|
||||||
|
2, "n2");
|
||||||
cs.handle(new NodeAddedSchedulerEvent(n1));
|
cs.handle(new NodeAddedSchedulerEvent(n1));
|
||||||
cs.handle(new NodeAddedSchedulerEvent(n2));
|
cs.handle(new NodeAddedSchedulerEvent(n2));
|
||||||
|
|
||||||
|
Map<String, Long> guaranteedCapA11 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getGuaranteedCapacity();
|
||||||
|
assertEquals(94, guaranteedCapA11
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
Map<String, Long> maxCapA11 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a1")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getMaxCapacity();
|
||||||
|
assertEquals(3000, maxCapA11
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
|
||||||
assertEquals(10240, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB());
|
assertEquals(10240, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB());
|
||||||
assertEquals(71680, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
|
assertEquals(71680, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
|
||||||
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
|
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
|
||||||
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
|
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
|
||||||
|
Map<String, Long> guaranteedCapA =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getGuaranteedCapacity();
|
||||||
|
assertEquals(314, guaranteedCapA
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
Map<String, Long> maxCapA =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getMaxCapacity();
|
||||||
|
assertEquals(3000, maxCapA
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
Map<String, Long> guaranteedCapB1 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getGuaranteedCapacity();
|
||||||
|
assertEquals(2126, guaranteedCapB1
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
Map<String, Long> maxCapB1 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getMaxCapacity();
|
||||||
|
assertEquals(3000, maxCapB1
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
|
||||||
// Remove a node, metrics should be updated
|
// Remove a node, metrics should be updated
|
||||||
cs.handle(new NodeRemovedSchedulerEvent(n2));
|
cs.handle(new NodeRemovedSchedulerEvent(n2));
|
||||||
|
@ -5179,6 +5270,31 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||||
assertEquals(35840, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
|
assertEquals(35840, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
|
||||||
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
|
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
|
||||||
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
|
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
|
||||||
|
Map<String, Long> guaranteedCapA1 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getGuaranteedCapacity();
|
||||||
|
|
||||||
|
assertEquals(104, guaranteedCapA1
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
Map<String, Long> maxCapA1 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getMaxCapacity();
|
||||||
|
assertEquals(1000, maxCapA1
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
Map<String, Long> guaranteedCapB11 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getGuaranteedCapacity();
|
||||||
|
assertEquals(708, guaranteedCapB11
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
Map<String, Long> maxCapB11 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("b1")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getMaxCapacity();
|
||||||
|
assertEquals(1000, maxCapB11
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a")
|
assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a")
|
||||||
.getMetrics()).getGuaranteedCapacity(), DELTA);
|
.getMetrics()).getGuaranteedCapacity(), DELTA);
|
||||||
assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a")
|
assertEquals(A_CAPACITY / 100, ((CSQueueMetrics)cs.getQueue("a")
|
||||||
|
@ -5197,20 +5313,49 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
||||||
.getMaxAbsoluteCapacity(), DELTA);
|
.getMaxAbsoluteCapacity(), DELTA);
|
||||||
|
|
||||||
// Add child queue to a, and reinitialize. Metrics should be updated
|
// Add child queue to a, and reinitialize. Metrics should be updated
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a", new String[] {"a1", "a2", "a3"} );
|
csConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a",
|
||||||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a2", 30.0f);
|
new String[] {"a1", "a2", "a3"});
|
||||||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 40.0f);
|
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a2", 29.5f);
|
||||||
conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 50.0f);
|
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 40.5f);
|
||||||
|
csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3",
|
||||||
|
50.0f);
|
||||||
|
|
||||||
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
|
cs.reinitialize(csConf, new RMContextImpl(null, null, null, null, null,
|
||||||
null, new RMContainerTokenSecretManager(conf),
|
null, new RMContainerTokenSecretManager(csConf),
|
||||||
new NMTokenSecretManagerInRM(conf),
|
new NMTokenSecretManagerInRM(csConf),
|
||||||
new ClientToAMTokenSecretManagerInRM(), null));
|
new ClientToAMTokenSecretManagerInRM(), null));
|
||||||
|
|
||||||
assertEquals(1024, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getGuaranteedMB());
|
assertEquals(1024, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getGuaranteedMB());
|
||||||
assertEquals(2048, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getGuaranteedMB());
|
assertEquals(2048, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getGuaranteedMB());
|
||||||
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB());
|
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB());
|
||||||
assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB());
|
assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB());
|
||||||
|
|
||||||
|
Map<String, Long> guaranteedCapA2 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getGuaranteedCapacity();
|
||||||
|
assertEquals(30, guaranteedCapA2
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
Map<String, Long> maxCapA2 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a2")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getMaxCapacity();
|
||||||
|
assertEquals(1000, maxCapA2
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
|
||||||
|
Map<String, Long> guaranteedCapA3 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getGuaranteedCapacity();
|
||||||
|
assertEquals(42, guaranteedCapA3
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
Map<String, Long> maxCapA3 =
|
||||||
|
((CSQueueMetricsForCustomResources) ((CSQueueMetrics) cs.getQueue("a3")
|
||||||
|
.getMetrics()).getQueueMetricsForCustomResources())
|
||||||
|
.getMaxCapacity();
|
||||||
|
assertEquals(500, maxCapA3
|
||||||
|
.get(TestQueueMetricsForCustomResources.CUSTOM_RES_1).longValue());
|
||||||
|
rm.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue