From 23b72d8ae1c686a5416f772f1f93988388701c64 Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Wed, 16 Oct 2019 21:10:08 +0000 Subject: [PATCH] YARN-9773: Add QueueMetrics for Custom Resources. Contributed by Manikandan R. (cherry picked from commit a5034c7988b6bf54bb7dab208100a2d205e3929e) --- .../scheduler/QueueMetrics.java | 112 +++++++++++++++++- .../scheduler/ResourceMetricsChecker.java | 67 ++++++++--- .../scheduler/TestQueueMetrics.java | 60 ++++------ .../TestQueueMetricsForCustomResources.java | 57 ++++++++- 4 files changed, 240 insertions(+), 56 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index bb8d1d3a8e3..707cafaf924 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.metrics2.lib.Interns.info; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; @@ -43,6 +44,7 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -123,6 +125,31 @@ public class QueueMetrics implements MetricsSource { protected final Configuration conf; private QueueMetricsForCustomResources queueMetricsForCustomResources; + private static final String ALLOCATED_RESOURCE_METRIC_PREFIX = + "AllocatedResource."; + private static final String ALLOCATED_RESOURCE_METRIC_DESC = + "Allocated NAME"; + + private static final String AVAILABLE_RESOURCE_METRIC_PREFIX = + "AvailableResource."; + private static final String AVAILABLE_RESOURCE_METRIC_DESC = + "Available NAME"; + + private static final String PENDING_RESOURCE_METRIC_PREFIX = + "PendingResource."; + private static final String PENDING_RESOURCE_METRIC_DESC = + "Pending NAME"; + + private static final String RESERVED_RESOURCE_METRIC_PREFIX = + "ReservedResource."; + private static final String RESERVED_RESOURCE_METRIC_DESC = + "Reserved NAME"; + + private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX = + "AggregatePreemptedSeconds."; + private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC = + "Aggregate Preempted Seconds for NAME"; + protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { registry = new MetricsRegistry(RECORD_INFO); @@ -137,6 +164,7 @@ public class QueueMetrics implements MetricsSource { if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { this.queueMetricsForCustomResources = new QueueMetricsForCustomResources(); + registerCustomResources(); } } @@ -368,6 +396,9 @@ public class QueueMetrics implements MetricsSource { availableVCores.set(limit.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.setAvailable(limit); + registerCustomResources( + queueMetricsForCustomResources.getAvailableValues(), + AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC); } } } @@ -420,16 +451,67 @@ public class QueueMetrics implements MetricsSource { } } + /** + * Register all custom resources metrics as part of initialization. As and + * when this metric object construction happens for any queue, all custom + * resource metrics value would be initialized with '0' like any other + * mandatory resources metrics + */ + private void registerCustomResources() { + Map customResources = + new HashMap(); + ResourceInformation[] resources = + ResourceUtils.getResourceTypesArray(); + + for (int i = + 2; i < resources.length; i++) { + ResourceInformation resource = + resources[i]; + customResources.put(resource.getName(), new Long(0)); + } + + registerCustomResources(customResources, ALLOCATED_RESOURCE_METRIC_PREFIX, + ALLOCATED_RESOURCE_METRIC_DESC); + registerCustomResources(customResources, AVAILABLE_RESOURCE_METRIC_PREFIX, + AVAILABLE_RESOURCE_METRIC_DESC); + registerCustomResources(customResources, PENDING_RESOURCE_METRIC_PREFIX, + PENDING_RESOURCE_METRIC_DESC); + registerCustomResources(customResources, RESERVED_RESOURCE_METRIC_PREFIX, + RESERVED_RESOURCE_METRIC_DESC); + registerCustomResources(customResources, + AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX, + AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC); + } + + private void registerCustomResources(Map customResources, + String metricPrefix, String metricDesc) { + for (Entry entry : customResources.entrySet()) { + String resourceName = entry.getKey(); + Long resourceValue = entry.getValue(); + + MutableGaugeLong resourceMetric = + (MutableGaugeLong) this.registry.get(metricPrefix + resourceName); + + if (resourceMetric == null) { + resourceMetric = + this.registry.newGauge(metricPrefix + resourceName, + metricDesc.replace("NAME", resourceName), 0L); + } + resourceMetric.set(resourceValue); + } + } + private void _incrPendingResources(int containers, Resource res) { pendingContainers.incr(containers); pendingMB.incr(res.getMemorySize() * containers); pendingVCores.incr(res.getVirtualCores() * containers); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.increasePending(res, containers); + registerCustomResources(queueMetricsForCustomResources.getPendingValues(), + PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); } } - public void decrPendingResources(String partition, String user, int containers, Resource res) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { @@ -450,6 +532,8 @@ public class QueueMetrics implements MetricsSource { pendingVCores.decr(res.getVirtualCores() * containers); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.decreasePending(res, containers); + registerCustomResources(queueMetricsForCustomResources.getPendingValues(), + PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); } } @@ -482,6 +566,9 @@ public class QueueMetrics implements MetricsSource { allocatedVCores.incr(res.getVirtualCores() * containers); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.increaseAllocated(res, containers); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } if (decrPending) { @@ -510,12 +597,18 @@ public class QueueMetrics implements MetricsSource { allocatedVCores.incr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.increaseAllocated(res); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } pendingMB.decr(res.getMemorySize()); pendingVCores.decr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.decreasePending(res); + registerCustomResources( + queueMetricsForCustomResources.getPendingValues(), + PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); } QueueMetrics userMetrics = getUserMetrics(user); @@ -537,6 +630,9 @@ public class QueueMetrics implements MetricsSource { allocatedVCores.decr(res.getVirtualCores() * containers); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.decreaseAllocated(res, containers); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } QueueMetrics userMetrics = getUserMetrics(user); @@ -560,6 +656,9 @@ public class QueueMetrics implements MetricsSource { allocatedVCores.decr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.decreaseAllocated(res); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } QueueMetrics userMetrics = getUserMetrics(user); @@ -597,6 +696,11 @@ public class QueueMetrics implements MetricsSource { if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources .increaseAggregatedPreemptedSeconds(res, seconds); + registerCustomResources( + queueMetricsForCustomResources.getAggregatePreemptedSeconds() + .getValues(), + AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX, + AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC); } if (parent != null) { parent.updatePreemptedSecondsForCustomResources(res, seconds); @@ -623,6 +727,9 @@ public class QueueMetrics implements MetricsSource { reservedVCores.incr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.increaseReserved(res); + registerCustomResources( + queueMetricsForCustomResources.getReservedValues(), + RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); } QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { @@ -639,6 +746,9 @@ public class QueueMetrics implements MetricsSource { reservedVCores.decr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.decreaseReserved(res); + registerCustomResources( + queueMetricsForCustomResources.getReservedValues(), + RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); } QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java index 05341aab10e..b49b125a974 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java @@ -43,6 +43,16 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMe import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2; final class ResourceMetricsChecker { private final static Logger LOG = @@ -52,21 +62,33 @@ final class ResourceMetricsChecker { GAUGE_INT, GAUGE_LONG, COUNTER_INT, COUNTER_LONG } + private static final ResourceMetricsChecker INITIAL_MANDATORY_RES_CHECKER = + new ResourceMetricsChecker().gaugeLong(ALLOCATED_MB, 0) + .gaugeInt(ALLOCATED_V_CORES, 0).gaugeInt(ALLOCATED_CONTAINERS, 0) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 0) + .counter(AGGREGATE_CONTAINERS_RELEASED, 0).gaugeLong(AVAILABLE_MB, 0) + .gaugeInt(AVAILABLE_V_CORES, 0).gaugeLong(PENDING_MB, 0) + .gaugeInt(PENDING_V_CORES, 0).gaugeInt(PENDING_CONTAINERS, 0) + .gaugeLong(RESERVED_MB, 0).gaugeInt(RESERVED_V_CORES, 0) + .gaugeInt(RESERVED_CONTAINERS, 0); + private static final ResourceMetricsChecker INITIAL_CHECKER = - new ResourceMetricsChecker() - .gaugeLong(ALLOCATED_MB, 0) - .gaugeInt(ALLOCATED_V_CORES, 0) - .gaugeInt(ALLOCATED_CONTAINERS, 0) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 0) - .counter(AGGREGATE_CONTAINERS_RELEASED, 0) - .gaugeLong(AVAILABLE_MB, 0) - .gaugeInt(AVAILABLE_V_CORES, 0) - .gaugeLong(PENDING_MB, 0) - .gaugeInt(PENDING_V_CORES, 0) - .gaugeInt(PENDING_CONTAINERS, 0) - .gaugeLong(RESERVED_MB, 0) - .gaugeInt(RESERVED_V_CORES, 0) - .gaugeInt(RESERVED_CONTAINERS, 0); + new ResourceMetricsChecker().gaugeLong(ALLOCATED_MB, 0) + .gaugeInt(ALLOCATED_V_CORES, 0).gaugeInt(ALLOCATED_CONTAINERS, 0) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 0) + .counter(AGGREGATE_CONTAINERS_RELEASED, 0).gaugeLong(AVAILABLE_MB, 0) + .gaugeInt(AVAILABLE_V_CORES, 0).gaugeLong(PENDING_MB, 0) + .gaugeInt(PENDING_V_CORES, 0).gaugeInt(PENDING_CONTAINERS, 0) + .gaugeLong(RESERVED_MB, 0).gaugeInt(RESERVED_V_CORES, 0) + .gaugeInt(RESERVED_CONTAINERS, 0).gaugeLong(ALLOCATED_CUSTOM_RES1, 0) + .gaugeLong(ALLOCATED_CUSTOM_RES2, 0).gaugeLong(AVAILABLE_CUSTOM_RES1, 0) + .gaugeLong(AVAILABLE_CUSTOM_RES2, 0).gaugeLong(PENDING_CUSTOM_RES1, 0) + .gaugeLong(PENDING_CUSTOM_RES2, 0).gaugeLong(RESERVED_CUSTOM_RES1, 0) + .gaugeLong(RESERVED_CUSTOM_RES2, 0) + .gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1, 0) + .gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2, 0); + + enum ResourceMetricsKey { ALLOCATED_MB("AllocatedMB", GAUGE_LONG), @@ -87,7 +109,18 @@ final class ResourceMetricsChecker { AGGREGATE_VCORE_SECONDS_PREEMPTED( "AggregateVcoreSecondsPreempted", COUNTER_LONG), AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED( - "AggregateMemoryMBSecondsPreempted", COUNTER_LONG); + "AggregateMemoryMBSecondsPreempted", COUNTER_LONG), + ALLOCATED_CUSTOM_RES1("AllocatedResource.custom_res_1", GAUGE_LONG), + ALLOCATED_CUSTOM_RES2("AllocatedResource.custom_res_2", GAUGE_LONG), + AVAILABLE_CUSTOM_RES1("AvailableResource.custom_res_1", GAUGE_LONG), + AVAILABLE_CUSTOM_RES2("AvailableResource.custom_res_2", GAUGE_LONG), + PENDING_CUSTOM_RES1("PendingResource.custom_res_1",GAUGE_LONG), + PENDING_CUSTOM_RES2("PendingResource.custom_res_2",GAUGE_LONG), + RESERVED_CUSTOM_RES1("ReservedResource.custom_res_1",GAUGE_LONG), + RESERVED_CUSTOM_RES2("ReservedResource.custom_res_2", GAUGE_LONG), + AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1("AggregatePreemptedSeconds.custom_res_1", GAUGE_LONG), + AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2("AggregatePreemptedSeconds.custom_res_2", GAUGE_LONG); + private String value; private ResourceMetricType type; @@ -131,6 +164,10 @@ final class ResourceMetricsChecker { return new ResourceMetricsChecker(INITIAL_CHECKER); } + public static ResourceMetricsChecker createMandatoryResourceChecker() { + return new ResourceMetricsChecker(INITIAL_MANDATORY_RES_CHECKER); + } + ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) { ensureTypeIsCorrect(key, GAUGE_LONG); gaugesLong.put(key, value); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index 2066f607c53..33c39290de5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -105,13 +105,11 @@ public class TestQueueMetrics { USER, 5, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create() - .gaugeLong(AVAILABLE_MB, 100 * GB) - .gaugeInt(AVAILABLE_V_CORES, 100) - .gaugeLong(PENDING_MB, 15 * GB) - .gaugeInt(PENDING_V_CORES, 15) - .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(queueSource); + ResourceMetricsChecker rmChecker = + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100) + .gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(queueSource); metrics.runAppAttempt(app.getApplicationId(), USER); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) @@ -284,7 +282,7 @@ public class TestQueueMetrics { // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources ResourceMetricsChecker resMetricsQueueSourceChecker = - ResourceMetricsChecker.create() + ResourceMetricsChecker.createMandatoryResourceChecker() .gaugeLong(AVAILABLE_MB, 100 * GB) .gaugeInt(AVAILABLE_V_CORES, 100) .gaugeLong(PENDING_MB, 15 * GB) @@ -292,7 +290,7 @@ public class TestQueueMetrics { .gaugeInt(PENDING_CONTAINERS, 5) .checkAgainst(queueSource); ResourceMetricsChecker resMetricsUserSourceChecker = - ResourceMetricsChecker.create() + ResourceMetricsChecker.createMandatoryResourceChecker() .gaugeLong(AVAILABLE_MB, 10 * GB) .gaugeInt(AVAILABLE_V_CORES, 10) .gaugeLong(PENDING_MB, 15 * GB) @@ -471,37 +469,25 @@ public class TestQueueMetrics { USER, 5, Resources.createResource(3*GB, 3)); ResourceMetricsChecker resMetricsQueueSourceChecker = - ResourceMetricsChecker.create() - .gaugeLong(AVAILABLE_MB, 100 * GB) - .gaugeInt(AVAILABLE_V_CORES, 100) - .gaugeLong(PENDING_MB, 15 * GB) - .gaugeInt(PENDING_V_CORES, 15) - .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(leaf.queueSource); + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100) + .gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(leaf.queueSource); ResourceMetricsChecker resMetricsParentQueueSourceChecker = - ResourceMetricsChecker.create() - .gaugeLong(AVAILABLE_MB, 100 * GB) - .gaugeInt(AVAILABLE_V_CORES, 100) - .gaugeLong(PENDING_MB, 15 * GB) - .gaugeInt(PENDING_V_CORES, 15) - .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(root.queueSource); + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100) + .gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(root.queueSource); ResourceMetricsChecker resMetricsUserSourceChecker = - ResourceMetricsChecker.create() - .gaugeLong(AVAILABLE_MB, 10 * GB) - .gaugeInt(AVAILABLE_V_CORES, 10) - .gaugeLong(PENDING_MB, 15 * GB) - .gaugeInt(PENDING_V_CORES, 15) - .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(leaf.userSource); + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(AVAILABLE_MB, 10 * GB).gaugeInt(AVAILABLE_V_CORES, 10) + .gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(leaf.userSource); ResourceMetricsChecker resMetricsParentUserSourceChecker = - ResourceMetricsChecker.create() - .gaugeLong(AVAILABLE_MB, 10 * GB) - .gaugeInt(AVAILABLE_V_CORES, 10) - .gaugeLong(PENDING_MB, 15 * GB) - .gaugeInt(PENDING_V_CORES, 15) - .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(root.userSource); + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(AVAILABLE_MB, 10 * GB).gaugeInt(AVAILABLE_V_CORES, 10) + .gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(root.userSource); leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER); appMetricsQueueSourceChecker = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java index 76a98490c21..ed0ae6e3305 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java @@ -73,6 +73,17 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -206,7 +217,11 @@ public class TestQueueMetricsForCustomResources { .gaugeLong(PENDING_MB, containers * testData.resource.getMemorySize()) .gaugeInt(PENDING_V_CORES, containers * - testData.resource.getVirtualCores()); + testData.resource.getVirtualCores()) + .gaugeLong(PENDING_CUSTOM_RES1, + containers * testData.customResourceValues.get(CUSTOM_RES_1)) + .gaugeLong(PENDING_CUSTOM_RES2, + containers * testData.customResourceValues.get(CUSTOM_RES_2)); assertAllMetrics(testData.leafQueue, checker, QueueMetrics::getPendingResources, MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues( @@ -229,6 +244,12 @@ public class TestQueueMetricsForCustomResources { .gaugeInt(PENDING_CONTAINERS, 0) .gaugeLong(PENDING_MB, 0) .gaugeInt(PENDING_V_CORES, 0) + .gaugeLong(ALLOCATED_CUSTOM_RES1, + testData.containers + * testData.customResourceValues.get(CUSTOM_RES_1)) + .gaugeLong(ALLOCATED_CUSTOM_RES2, + testData.containers + * testData.customResourceValues.get(CUSTOM_RES_2)) .checkAgainst(testData.leafQueue.queueSource); if (decreasePending) { assertAllMetrics(testData.leafQueue, checker, @@ -260,7 +281,11 @@ public class TestQueueMetricsForCustomResources { .counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED, testData.resource.getMemorySize() * seconds) .counter(AGGREGATE_VCORE_SECONDS_PREEMPTED, - testData.resource.getVirtualCores() * seconds); + testData.resource.getVirtualCores() * seconds) + .gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1, + testData.customResourceValues.get(CUSTOM_RES_1) * seconds) + .gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2, + testData.customResourceValues.get(CUSTOM_RES_2) * seconds); assertQueueMetricsOnly(testData.leafQueue, checker, this::convertPreemptedSecondsToResource, @@ -290,6 +315,10 @@ public class TestQueueMetricsForCustomResources { .gaugeInt(RESERVED_CONTAINERS, 1) .gaugeLong(RESERVED_MB, testData.resource.getMemorySize()) .gaugeInt(RESERVED_V_CORES, testData.resource.getVirtualCores()) + .gaugeLong(RESERVED_CUSTOM_RES1, + testData.customResourceValues.get(CUSTOM_RES_1)) + .gaugeLong(RESERVED_CUSTOM_RES2, + testData.customResourceValues.get(CUSTOM_RES_2)) .checkAgainst(testData.leafQueue.queueSource); assertAllMetrics(testData.leafQueue, checker, QueueMetrics::getReservedResources, @@ -382,6 +411,8 @@ public class TestQueueMetricsForCustomResources { ResourceMetricsChecker.create() .gaugeLong(AVAILABLE_MB, GB) .gaugeInt(AVAILABLE_V_CORES, 4) + .gaugeLong(AVAILABLE_CUSTOM_RES1, 5 * GB) + .gaugeLong(AVAILABLE_CUSTOM_RES2, 6 * GB) .checkAgainst(queueSource); assertCustomResourceValue(metrics, @@ -408,6 +439,8 @@ public class TestQueueMetricsForCustomResources { ResourceMetricsChecker.create() .gaugeLong(AVAILABLE_MB, GB) .gaugeInt(AVAILABLE_V_CORES, 4) + .gaugeLong(AVAILABLE_CUSTOM_RES1, 15 * GB) + .gaugeLong(AVAILABLE_CUSTOM_RES2, 20 * GB) .checkAgainst(queueSource); assertCustomResourceValue(metrics, @@ -447,12 +480,23 @@ public class TestQueueMetricsForCustomResources { final int vCoresToDecrease = resourceToDecrease.getVirtualCores(); final long memoryMBToDecrease = resourceToDecrease.getMemorySize(); final int containersAfterDecrease = containers - containersToDecrease; + final long customRes1ToDecrease = + resourceToDecrease.getResourceValue(CUSTOM_RES_1); + final long customRes2ToDecrease = + resourceToDecrease.getResourceValue(CUSTOM_RES_2); + final int vcoresAfterDecrease = (defaultResource.getVirtualCores() * containers) - (vCoresToDecrease * containersToDecrease); final long memoryAfterDecrease = (defaultResource.getMemorySize() * containers) - (memoryMBToDecrease * containersToDecrease); + final long customResource1AfterDecrease = + (testData.customResourceValues.get(CUSTOM_RES_1) * containers) + - (customRes1ToDecrease * containersToDecrease); + final long customResource2AfterDecrease = + (testData.customResourceValues.get(CUSTOM_RES_2) * containers) + - (customRes2ToDecrease * containersToDecrease); //first, increase resources to be able to decrease some testIncreasePendingResources(testData); @@ -470,6 +514,8 @@ public class TestQueueMetricsForCustomResources { .gaugeInt(PENDING_CONTAINERS, containersAfterDecrease) .gaugeLong(PENDING_MB, memoryAfterDecrease) .gaugeInt(PENDING_V_CORES, vcoresAfterDecrease) + .gaugeLong(PENDING_CUSTOM_RES1, customResource1AfterDecrease) + .gaugeLong(PENDING_CUSTOM_RES2, customResource2AfterDecrease) .checkAgainst(testData.leafQueue.queueSource); assertAllMetrics(testData.leafQueue, checker, @@ -524,7 +570,11 @@ public class TestQueueMetricsForCustomResources { .gaugeLong(ALLOCATED_MB, resource.getMemorySize()) .gaugeInt(ALLOCATED_V_CORES, resource.getVirtualCores()) .gaugeInt(PENDING_CONTAINERS, 1).gaugeLong(PENDING_MB, 0) - .gaugeInt(PENDING_V_CORES, 0); + .gaugeInt(PENDING_V_CORES, 0) + .gaugeLong(ALLOCATED_CUSTOM_RES1, + testData.customResourceValues.get(CUSTOM_RES_1)) + .gaugeLong(ALLOCATED_CUSTOM_RES2, + testData.customResourceValues.get(CUSTOM_RES_2)); checker.checkAgainst(testData.leafQueue.queueSource); checker.checkAgainst(testData.leafQueue.getRoot().queueSource); @@ -615,6 +665,7 @@ public class TestQueueMetricsForCustomResources { .gaugeInt(RESERVED_CONTAINERS, 0) .gaugeLong(RESERVED_MB, 0) .gaugeInt(RESERVED_V_CORES, 0) + .gaugeLong(RESERVED_CUSTOM_RES1, 0).gaugeLong(RESERVED_CUSTOM_RES2, 0) .checkAgainst(testData.leafQueue.queueSource); assertAllMetrics(testData.leafQueue, checker, QueueMetrics::getReservedResources,