YARN-9085. Add Guaranteed and MaxCapacity to CSQueueMetrics
(cherry picked from commit 978ab3e958227220cb6f1a08ae6e7cdb8a46628b) (cherry picked from commit dca69d178dba21c41fd1293187f29143f7e81e19)
This commit is contained in:
parent
6e6f43afd8
commit
7b523e6a77
|
@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.annotation.Metric;
|
|||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeFloat;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
|
@ -46,6 +47,14 @@ public class CSQueueMetrics extends QueueMetrics {
|
|||
MutableGaugeFloat usedCapacity;
|
||||
@Metric("Percent of Absolute Capacity Used")
|
||||
MutableGaugeFloat absoluteUsedCapacity;
|
||||
@Metric("Guaranteed memory in MB")
|
||||
MutableGaugeLong guaranteedMB;
|
||||
@Metric("Guaranteed CPU in virtual cores")
|
||||
MutableGaugeInt guaranteedVCores;
|
||||
@Metric("Maximum memory in MB")
|
||||
MutableGaugeLong maxCapacityMB;
|
||||
@Metric("Maximum CPU in virtual cores")
|
||||
MutableGaugeInt maxCapacityVCores;
|
||||
|
||||
CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
|
||||
boolean enableUserMetrics, Configuration conf) {
|
||||
|
@ -126,6 +135,36 @@ public class CSQueueMetrics extends QueueMetrics {
|
|||
}
|
||||
}
|
||||
|
||||
public long getGuaranteedMB() {
|
||||
return guaranteedMB.value();
|
||||
}
|
||||
|
||||
public int getGuaranteedVCores() {
|
||||
return guaranteedVCores.value();
|
||||
}
|
||||
|
||||
public void setGuaranteedResources(String partition, Resource res) {
|
||||
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
guaranteedMB.set(res.getMemorySize());
|
||||
guaranteedVCores.set(res.getVirtualCores());
|
||||
}
|
||||
}
|
||||
|
||||
public long getMaxCapacityMB() {
|
||||
return maxCapacityMB.value();
|
||||
}
|
||||
|
||||
public int getMaxCapacityVCores() {
|
||||
return maxCapacityVCores.value();
|
||||
}
|
||||
|
||||
public void setMaxCapacityResources(String partition, Resource res) {
|
||||
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
|
||||
maxCapacityMB.set(res.getMemorySize());
|
||||
maxCapacityVCores.set(res.getVirtualCores());
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized static CSQueueMetrics forQueue(String queueName,
|
||||
Queue parent, boolean enableUserMetrics, Configuration conf) {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
|
|
|
@ -314,4 +314,21 @@ public class CSQueueUtils {
|
|||
childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition,
|
||||
getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster));
|
||||
}
|
||||
|
||||
/**
|
||||
* Updated configured capacity/max-capacity for queue.
|
||||
* @param rc resource calculator
|
||||
* @param partitionResource total cluster resources for this partition
|
||||
* @param partition partition being updated
|
||||
* @param queue queue
|
||||
*/
|
||||
public static void updateConfiguredCapacityMetrics(ResourceCalculator rc,
|
||||
Resource partitionResource, String partition, AbstractCSQueue queue) {
|
||||
queue.getMetrics().setGuaranteedResources(partition, rc.multiplyAndNormalizeDown(
|
||||
partitionResource, queue.getQueueCapacities().getAbsoluteCapacity(partition),
|
||||
queue.getMinimumAllocation()));
|
||||
queue.getMetrics().setMaxCapacityResources(partition, rc.multiplyAndNormalizeDown(
|
||||
partitionResource, queue.getQueueCapacities().getAbsoluteMaximumCapacity(partition),
|
||||
queue.getMinimumAllocation()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1824,6 +1824,10 @@ public class LeafQueue extends AbstractCSQueue {
|
|||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
this, labelManager, null);
|
||||
// Update configured capacity/max-capacity for default partition only
|
||||
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
|
||||
labelManager.getResourceByLabel(null, clusterResource),
|
||||
RMNodeLabelsManager.NO_LABEL, this);
|
||||
|
||||
// queue metrics are updated, more resource may be available
|
||||
// activate the pending applications if possible
|
||||
|
|
|
@ -911,6 +911,10 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
this, labelManager, null);
|
||||
// Update configured capacity/max-capacity for default partition only
|
||||
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
|
||||
labelManager.getResourceByLabel(null, clusterResource),
|
||||
RMNodeLabelsManager.NO_LABEL, this);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
|
|
|
@ -5106,4 +5106,48 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
|
|||
assertEquals(4, appsInB1.size());
|
||||
rm.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCSQueueMetrics() throws Exception {
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
cs.setConf(new YarnConfiguration());
|
||||
cs.setRMContext(resourceManager.getRMContext());
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
cs.init(conf);
|
||||
cs.start();
|
||||
|
||||
RMNode n1 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 1, "n1");
|
||||
RMNode n2 = MockNodes.newNodeInfo(0, MockNodes.newResource(50 * GB), 2, "n2");
|
||||
cs.handle(new NodeAddedSchedulerEvent(n1));
|
||||
cs.handle(new NodeAddedSchedulerEvent(n2));
|
||||
|
||||
assertEquals(10240, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB());
|
||||
assertEquals(71680, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
|
||||
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
|
||||
assertEquals(102400, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
|
||||
|
||||
// Remove a node, metrics should be updated
|
||||
cs.handle(new NodeRemovedSchedulerEvent(n2));
|
||||
assertEquals(5120, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getGuaranteedMB());
|
||||
assertEquals(35840, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getGuaranteedMB());
|
||||
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a").getMetrics()).getMaxCapacityMB());
|
||||
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("b1").getMetrics()).getMaxCapacityMB());
|
||||
|
||||
// Add child queue to a, and reinitialize. Metrics should be updated
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a", new String[] {"a1", "a2", "a3"} );
|
||||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a2", 30.0f);
|
||||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 40.0f);
|
||||
conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a3", 50.0f);
|
||||
|
||||
cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null,
|
||||
null, new RMContainerTokenSecretManager(conf),
|
||||
new NMTokenSecretManagerInRM(conf),
|
||||
new ClientToAMTokenSecretManagerInRM(), null));
|
||||
|
||||
assertEquals(1024, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getGuaranteedMB());
|
||||
assertEquals(2048, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getGuaranteedMB());
|
||||
assertEquals(51200, ((CSQueueMetrics)cs.getQueue("a2").getMetrics()).getMaxCapacityMB());
|
||||
assertEquals(25600, ((CSQueueMetrics)cs.getQueue("a3").getMetrics()).getMaxCapacityMB());
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue