From e60b797c88541f94cecc7fdbcaad010c4742cfdb Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Thu, 4 Oct 2018 13:00:31 -0700 Subject: [PATCH] YARN-8750. Refactor TestQueueMetrics. (Contributed by Szilard Nemeth) --- .../scheduler/AppMetricsChecker.java | 122 ++++ .../scheduler/ResourceMetricsChecker.java | 170 ++++++ .../scheduler/TestQueueMetrics.java | 550 +++++++++++++----- 3 files changed, 701 insertions(+), 141 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppMetricsChecker.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppMetricsChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppMetricsChecker.java new file mode 100644 index 00000000000..8967234e63d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppMetricsChecker.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import com.google.common.collect.Maps; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_KILLED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED; + +final class AppMetricsChecker { + private final static Logger LOG = + LoggerFactory.getLogger(AppMetricsChecker.class); + + private static final AppMetricsChecker INITIAL_CHECKER = + new AppMetricsChecker() + .counter(APPS_SUBMITTED, 0) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 0) + .counter(APPS_COMPLETED, 0) + .counter(APPS_FAILED, 0) + .counter(APPS_KILLED, 0); + + enum AppMetricsKey { + APPS_SUBMITTED("AppsSubmitted"), + APPS_PENDING("AppsPending"), + APPS_RUNNING("AppsRunning"), + APPS_COMPLETED("AppsCompleted"), + APPS_FAILED("AppsFailed"), + APPS_KILLED("AppsKilled"); + + private String value; + + AppMetricsKey(String value) { + this.value = value; + } + } + private final Map gaugesInt; + private final Map counters; + + private AppMetricsChecker() { + this.gaugesInt = Maps.newHashMap(); + this.counters = Maps.newHashMap(); + } + + private AppMetricsChecker(AppMetricsChecker checker) { + this.gaugesInt = Maps.newHashMap(checker.gaugesInt); + this.counters = Maps.newHashMap(checker.counters); + } + + public static AppMetricsChecker createFromChecker(AppMetricsChecker checker) { + return new AppMetricsChecker(checker); + } + + public static AppMetricsChecker create() { + return new AppMetricsChecker(INITIAL_CHECKER); + } + + AppMetricsChecker gaugeInt(AppMetricsKey key, int value) { + gaugesInt.put(key, value); + return this; + } + + AppMetricsChecker counter(AppMetricsKey key, int value) { + counters.put(key, value); + return this; + } + + AppMetricsChecker checkAgainst(MetricsSource source, boolean all) { + if (source == null) { + throw new IllegalStateException( + "MetricsSource should not be null!"); + } + MetricsRecordBuilder recordBuilder = getMetrics(source, all); + logAssertingMessage(source); + + for (Map.Entry gauge : gaugesInt.entrySet()) { + assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder); + } + + for (Map.Entry counter : counters.entrySet()) { + assertCounter(counter.getKey().value, counter.getValue(), recordBuilder); + } + return this; + } + + private void logAssertingMessage(MetricsSource source) { + String queueName = ((QueueMetrics) source).queueName; + Map users = ((QueueMetrics) source).users; + + if (LOG.isDebugEnabled()) { + LOG.debug("Asserting App metrics.. QueueName: " + queueName + ", users: " + + (users != null && !users.isEmpty() ? users : "")); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java new file mode 100644 index 00000000000..cd617d7b9d0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import com.google.common.collect.Maps; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES; + +final class ResourceMetricsChecker { + private final static Logger LOG = + LoggerFactory.getLogger(ResourceMetricsChecker.class); + + private static final ResourceMetricsChecker INITIAL_CHECKER = + new ResourceMetricsChecker() + .gaugeLong(ALLOCATED_MB, 0) + .gaugeInt(ALLOCATED_V_CORES, 0) + .gaugeInt(ALLOCATED_CONTAINERS, 0) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 0) + .counter(AGGREGATE_CONTAINERS_RELEASED, 0) + .gaugeLong(AVAILABLE_MB, 0) + .gaugeInt(AVAILABLE_V_CORES, 0) + .gaugeLong(PENDING_MB, 0) + .gaugeInt(PENDING_V_CORES, 0) + .gaugeInt(PENDING_CONTAINERS, 0) + .gaugeLong(RESERVED_MB, 0) + .gaugeInt(RESERVED_V_CORES, 0) + .gaugeInt(RESERVED_CONTAINERS, 0); + + enum ResourceMetricsKey { + ALLOCATED_MB("AllocatedMB"), + ALLOCATED_V_CORES("AllocatedVCores"), + ALLOCATED_CONTAINERS("AllocatedContainers"), + AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated"), + AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased"), + AVAILABLE_MB("AvailableMB"), + AVAILABLE_V_CORES("AvailableVCores"), + PENDING_MB("PendingMB"), + PENDING_V_CORES("PendingVCores"), + PENDING_CONTAINERS("PendingContainers"), + RESERVED_MB("ReservedMB"), + RESERVED_V_CORES("ReservedVCores"), + RESERVED_CONTAINERS("ReservedContainers"); + + private String value; + + ResourceMetricsKey(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + } + + private final Map gaugesLong; + private final Map gaugesInt; + private final Map counters; + + private ResourceMetricsChecker() { + this.gaugesLong = Maps.newHashMap(); + this.gaugesInt = Maps.newHashMap(); + this.counters = Maps.newHashMap(); + } + + private ResourceMetricsChecker(ResourceMetricsChecker checker) { + this.gaugesLong = Maps.newHashMap(checker.gaugesLong); + this.gaugesInt = Maps.newHashMap(checker.gaugesInt); + this.counters = Maps.newHashMap(checker.counters); + } + + public static ResourceMetricsChecker createFromChecker( + ResourceMetricsChecker checker) { + return new ResourceMetricsChecker(checker); + } + + public static ResourceMetricsChecker create() { + return new ResourceMetricsChecker(INITIAL_CHECKER); + } + + ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) { + gaugesLong.put(key, value); + return this; + } + + ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) { + gaugesInt.put(key, value); + return this; + } + + ResourceMetricsChecker counter(ResourceMetricsKey key, long value) { + counters.put(key, value); + return this; + } + + ResourceMetricsChecker checkAgainst(MetricsSource source) { + if (source == null) { + throw new IllegalStateException("MetricsSource should not be null!"); + } + MetricsRecordBuilder recordBuilder = getMetrics(source); + logAssertingMessage(source); + + for (Map.Entry gauge : gaugesLong.entrySet()) { + assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder); + } + + for (Map.Entry gauge : gaugesInt.entrySet()) { + assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder); + } + + for (Map.Entry counter : counters.entrySet()) { + assertCounter(counter.getKey().value, counter.getValue(), recordBuilder); + } + return this; + } + + private void logAssertingMessage(MetricsSource source) { + String queueName = ((QueueMetrics) source).queueName; + Map users = ((QueueMetrics) source).users; + + if (LOG.isDebugEnabled()) { + LOG.debug("Asserting Resource metrics.. QueueName: " + queueName + + ", users: " + (users != null && !users.isEmpty() ? users : "")); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index 196d4c291cd..c971d655e59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; -import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .AppMetricsChecker.AppMetricsKey.*; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.*; import static org.junit.Assert.assertNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -45,9 +47,8 @@ import org.junit.Before; import org.junit.Test; public class TestQueueMetrics { - static final int GB = 1024; // MB + private static final int GB = 1024; // MB private static final Configuration conf = new Configuration(); - private MetricsSystem ms; @Before @@ -56,7 +57,8 @@ public class TestQueueMetrics { QueueMetrics.clearQueueMetrics(); } - @Test public void testDefaultSingleQueueMetrics() { + @Test + public void testDefaultSingleQueueMetrics() { String queueName = "single"; String user = "alice"; @@ -67,9 +69,13 @@ public class TestQueueMetrics { metrics.submitApp(user); MetricsSource userSource = userSource(ms, queueName, user); - checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + AppMetricsChecker appMetricsChecker = AppMetricsChecker.create() + .counter(APPS_SUBMITTED, 1) + .checkAgainst(queueSource, true); metrics.submitAppAttempt(user); - checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_PENDING, 1) + .checkAgainst(queueSource, true); metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, Resources.createResource(100*GB, 100)); @@ -77,34 +83,63 @@ public class TestQueueMetrics { user, 5, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); + ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create() + .gaugeLong(AVAILABLE_MB, 100 * GB) + .gaugeInt(AVAILABLE_V_CORES, 100) + .gaugeLong(PENDING_MB, 15 * GB) + .gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5) + .checkAgainst(queueSource); metrics.runAppAttempt(app.getApplicationId(), user); - checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 1) + .checkAgainst(queueSource, true); metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, user, 3, Resources.createResource(2*GB, 2), true); - checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); + rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) + .gaugeLong(ALLOCATED_MB, 6 * GB) + .gaugeInt(ALLOCATED_V_CORES, 6) + .gaugeInt(ALLOCATED_CONTAINERS, 3) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) + .gaugeLong(PENDING_MB, 9 * GB) + .gaugeInt(PENDING_V_CORES, 9) + .gaugeInt(PENDING_CONTAINERS, 2) + .checkAgainst(queueSource); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, user, 1, Resources.createResource(2*GB, 2)); - checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); + rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) + .gaugeLong(ALLOCATED_MB, 4 * GB) + .gaugeInt(ALLOCATED_V_CORES, 4) + .gaugeInt(ALLOCATED_CONTAINERS, 2) + .counter(AGGREGATE_CONTAINERS_RELEASED, 1) + .checkAgainst(queueSource); metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, user, 0, Resources.createResource(2 * GB, 2)); - checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2, - 0, 0, 0); + //nothing should change in values + rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) + .checkAgainst(queueSource); metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL, user, 0, Resources.createResource(2 * GB, 2)); - checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2, - 0, 0, 0); + //nothing should change in values + ResourceMetricsChecker.createFromChecker(rmChecker) + .checkAgainst(queueSource); metrics.finishAppAttempt( app.getApplicationId(), app.isPending(), app.getUser()); - checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .counter(APPS_SUBMITTED, 1) + .gaugeInt(APPS_RUNNING, 0) + .checkAgainst(queueSource, true); metrics.finishApp(user, RMAppState.FINISHED); - checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); + AppMetricsChecker.createFromChecker(appMetricsChecker) + .counter(APPS_COMPLETED, 1) + .checkAgainst(queueSource, true); assertNull(userSource); } @@ -120,50 +155,77 @@ public class TestQueueMetrics { metrics.submitApp(user); MetricsSource userSource = userSource(ms, queueName, user); - checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + AppMetricsChecker appMetricsChecker = AppMetricsChecker.create() + .counter(APPS_SUBMITTED, 1) + .checkAgainst(queueSource, true); metrics.submitAppAttempt(user); - checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_PENDING, 1) + .checkAgainst(queueSource, true); metrics.runAppAttempt(app.getApplicationId(), user); - checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 1) + .checkAgainst(queueSource, true); metrics.finishAppAttempt( app.getApplicationId(), app.isPending(), app.getUser()); - checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_RUNNING, 0) + .checkAgainst(queueSource, true); // As the application has failed, framework retries the same application // based on configuration metrics.submitAppAttempt(user); - checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_PENDING, 1) + .checkAgainst(queueSource, true); metrics.runAppAttempt(app.getApplicationId(), user); - checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 1) + .checkAgainst(queueSource, true); // Suppose say application has failed this time as well. metrics.finishAppAttempt( app.getApplicationId(), app.isPending(), app.getUser()); - checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_RUNNING, 0) + .checkAgainst(queueSource, true); // As the application has failed, framework retries the same application // based on configuration metrics.submitAppAttempt(user); - checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_PENDING, 1) + .checkAgainst(queueSource, true); metrics.runAppAttempt(app.getApplicationId(), user); - checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 1) + .checkAgainst(queueSource, true); // Suppose say application has failed, and there's no more retries. metrics.finishAppAttempt( app.getApplicationId(), app.isPending(), app.getUser()); - checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); + appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_RUNNING, 0) + .checkAgainst(queueSource, true); metrics.finishApp(user, RMAppState.FAILED); - checkApps(queueSource, 1, 0, 0, 0, 1, 0, true); + AppMetricsChecker.createFromChecker(appMetricsChecker) + .gaugeInt(APPS_RUNNING, 0) + .counter(APPS_FAILED, 1) + .checkAgainst(queueSource, true); assertNull(userSource); } - @Test public void testSingleQueueWithUserMetrics() { + @Test + public void testSingleQueueWithUserMetrics() { String queueName = "single2"; String user = "dodo"; @@ -175,12 +237,22 @@ public class TestQueueMetrics { metrics.submitApp(user); MetricsSource userSource = userSource(ms, queueName, user); - checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); - checkApps(userSource, 1, 0, 0, 0, 0, 0, true); + AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create() + .counter(APPS_SUBMITTED, 1) + .checkAgainst(queueSource, true); + AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create() + .counter(APPS_SUBMITTED, 1) + .checkAgainst(userSource, true); metrics.submitAppAttempt(user); - checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); - checkApps(userSource, 1, 1, 0, 0, 0, 0, true); + appMetricsQueueSourceChecker = AppMetricsChecker + .createFromChecker(appMetricsQueueSourceChecker) + .gaugeInt(APPS_PENDING, 1) + .checkAgainst(queueSource, true); + appMetricsUserSourceChecker = AppMetricsChecker + .createFromChecker(appMetricsUserSourceChecker) + .gaugeInt(APPS_PENDING, 1) + .checkAgainst(userSource, true); metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, Resources.createResource(100*GB, 100)); @@ -188,36 +260,97 @@ public class TestQueueMetrics { user, Resources.createResource(10*GB, 10)); metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, user, 5, Resources.createResource(3*GB, 3)); + // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); - checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); + ResourceMetricsChecker resMetricsQueueSourceChecker = + ResourceMetricsChecker.create() + .gaugeLong(AVAILABLE_MB, 100 * GB) + .gaugeInt(AVAILABLE_V_CORES, 100) + .gaugeLong(PENDING_MB, 15 * GB) + .gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5) + .checkAgainst(queueSource); + ResourceMetricsChecker resMetricsUserSourceChecker = + ResourceMetricsChecker.create() + .gaugeLong(AVAILABLE_MB, 10 * GB) + .gaugeInt(AVAILABLE_V_CORES, 10) + .gaugeLong(PENDING_MB, 15 * GB) + .gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5) + .checkAgainst(userSource); metrics.runAppAttempt(app.getApplicationId(), user); - checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); - checkApps(userSource, 1, 0, 1, 0, 0, 0, true); + appMetricsQueueSourceChecker = AppMetricsChecker + .createFromChecker(appMetricsQueueSourceChecker) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 1) + .checkAgainst(queueSource, true); + appMetricsUserSourceChecker = AppMetricsChecker + .createFromChecker(appMetricsUserSourceChecker) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 1) + .checkAgainst(userSource, true); metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, user, 3, Resources.createResource(2*GB, 2), true); - checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); - checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); + resMetricsQueueSourceChecker = + ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) + .gaugeLong(ALLOCATED_MB, 6 * GB) + .gaugeInt(ALLOCATED_V_CORES, 6) + .gaugeInt(ALLOCATED_CONTAINERS, 3) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) + .gaugeLong(PENDING_MB, 9 * GB) + .gaugeInt(PENDING_V_CORES, 9) + .gaugeInt(PENDING_CONTAINERS, 2) + .checkAgainst(queueSource); + resMetricsUserSourceChecker = + ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) + .gaugeLong(ALLOCATED_MB, 6 * GB) + .gaugeInt(ALLOCATED_V_CORES, 6) + .gaugeInt(ALLOCATED_CONTAINERS, 3) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) + .gaugeLong(PENDING_MB, 9 * GB) + .gaugeInt(PENDING_V_CORES, 9) + .gaugeInt(PENDING_CONTAINERS, 2) + .checkAgainst(userSource); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, user, 1, Resources.createResource(2*GB, 2)); - checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); - checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); + ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) + .gaugeLong(ALLOCATED_MB, 4 * GB) + .gaugeInt(ALLOCATED_V_CORES, 4) + .gaugeInt(ALLOCATED_CONTAINERS, 2) + .counter(AGGREGATE_CONTAINERS_RELEASED, 1) + .checkAgainst(queueSource); + ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) + .gaugeLong(ALLOCATED_MB, 4 * GB) + .gaugeInt(ALLOCATED_V_CORES, 4) + .gaugeInt(ALLOCATED_CONTAINERS, 2) + .counter(AGGREGATE_CONTAINERS_RELEASED, 1) + .checkAgainst(userSource); metrics.finishAppAttempt( app.getApplicationId(), app.isPending(), app.getUser()); - checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); - checkApps(userSource, 1, 0, 0, 0, 0, 0, true); + appMetricsQueueSourceChecker = + AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) + .gaugeInt(APPS_RUNNING, 0) + .checkAgainst(queueSource, true); + appMetricsUserSourceChecker = + AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) + .gaugeInt(APPS_RUNNING, 0) + .checkAgainst(userSource, true); metrics.finishApp(user, RMAppState.FINISHED); - checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); - checkApps(userSource, 1, 0, 0, 1, 0, 0, true); + AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) + .counter(APPS_COMPLETED, 1) + .checkAgainst(queueSource, true); + AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) + .counter(APPS_COMPLETED, 1) + .checkAgainst(userSource, true); } - - @Test public void testNodeTypeMetrics() { + @Test + public void testNodeTypeMetrics() { String parentQueueName = "root"; String leafQueueName = "root.leaf"; String user = "alice"; @@ -237,33 +370,32 @@ public class TestQueueMetrics { MetricsSource parentUserSource = userSource(ms, parentQueueName, user); metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL); - checkAggregatedNodeTypes(queueSource,1L,0L,0L); - checkAggregatedNodeTypes(parentQueueSource,1L,0L,0L); - checkAggregatedNodeTypes(userSource,1L,0L,0L); - checkAggregatedNodeTypes(parentUserSource,1L,0L,0L); + checkAggregatedNodeTypes(queueSource, 1L, 0L, 0L); + checkAggregatedNodeTypes(parentQueueSource, 1L, 0L, 0L); + checkAggregatedNodeTypes(userSource, 1L, 0L, 0L); + checkAggregatedNodeTypes(parentUserSource, 1L, 0L, 0L); metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL); - checkAggregatedNodeTypes(queueSource,1L,1L,0L); - checkAggregatedNodeTypes(parentQueueSource,1L,1L,0L); - checkAggregatedNodeTypes(userSource,1L,1L,0L); - checkAggregatedNodeTypes(parentUserSource,1L,1L,0L); + checkAggregatedNodeTypes(queueSource, 1L, 1L, 0L); + checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 0L); + checkAggregatedNodeTypes(userSource, 1L, 1L, 0L); + checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 0L); metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); - checkAggregatedNodeTypes(queueSource,1L,1L,1L); - checkAggregatedNodeTypes(parentQueueSource,1L,1L,1L); - checkAggregatedNodeTypes(userSource,1L,1L,1L); - checkAggregatedNodeTypes(parentUserSource,1L,1L,1L); + checkAggregatedNodeTypes(queueSource, 1L, 1L, 1L); + checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 1L); + checkAggregatedNodeTypes(userSource, 1L, 1L, 1L); + checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 1L); metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); - checkAggregatedNodeTypes(queueSource,1L,1L,2L); - checkAggregatedNodeTypes(parentQueueSource,1L,1L,2L); - checkAggregatedNodeTypes(userSource,1L,1L,2L); - checkAggregatedNodeTypes(parentUserSource,1L,1L,2L); - + checkAggregatedNodeTypes(queueSource, 1L, 1L, 2L); + checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L); + checkAggregatedNodeTypes(userSource, 1L, 1L, 2L); + checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 2L); } - - @Test public void testTwoLevelWithUserMetrics() { + @Test + public void testTwoLevelWithUserMetrics() { String parentQueueName = "root"; String leafQueueName = "root.leaf"; String user = "alice"; @@ -282,16 +414,38 @@ public class TestQueueMetrics { MetricsSource userSource = userSource(ms, leafQueueName, user); MetricsSource parentUserSource = userSource(ms, parentQueueName, user); - checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); - checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true); - checkApps(userSource, 1, 0, 0, 0, 0, 0, true); - checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true); + AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create() + .counter(APPS_SUBMITTED, 1) + .checkAgainst(queueSource, true); + AppMetricsChecker appMetricsParentQueueSourceChecker = + AppMetricsChecker.create() + .counter(APPS_SUBMITTED, 1) + .checkAgainst(parentQueueSource, true); + AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create() + .counter(APPS_SUBMITTED, 1) + .checkAgainst(userSource, true); + AppMetricsChecker appMetricsParentUserSourceChecker = + AppMetricsChecker.create() + .counter(APPS_SUBMITTED, 1) + .checkAgainst(parentUserSource, true); metrics.submitAppAttempt(user); - checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); - checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true); - checkApps(userSource, 1, 1, 0, 0, 0, 0, true); - checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true); + appMetricsQueueSourceChecker = + AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) + .gaugeInt(APPS_PENDING, 1) + .checkAgainst(queueSource, true); + appMetricsParentQueueSourceChecker = + AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker) + .gaugeInt(APPS_PENDING, 1) + .checkAgainst(parentQueueSource, true); + appMetricsUserSourceChecker = + AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) + .gaugeInt(APPS_PENDING, 1) + .checkAgainst(userSource, true); + appMetricsParentUserSourceChecker = + AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker) + .gaugeInt(APPS_PENDING, 1) + .checkAgainst(parentUserSource, true); parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, Resources.createResource(100*GB, 100)); @@ -303,14 +457,51 @@ public class TestQueueMetrics { user, Resources.createResource(10*GB, 10)); metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, user, 5, Resources.createResource(3*GB, 3)); - checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); - checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); - checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); - checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); + + ResourceMetricsChecker resMetricsQueueSourceChecker = + ResourceMetricsChecker.create() + .gaugeLong(AVAILABLE_MB, 100 * GB) + .gaugeInt(AVAILABLE_V_CORES, 100) + .gaugeLong(PENDING_MB, 15 * GB) + .gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5) + .checkAgainst(queueSource); + ResourceMetricsChecker resMetricsParentQueueSourceChecker = + ResourceMetricsChecker.create() + .gaugeLong(AVAILABLE_MB, 100 * GB) + .gaugeInt(AVAILABLE_V_CORES, 100) + .gaugeLong(PENDING_MB, 15 * GB) + .gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5) + .checkAgainst(parentQueueSource); + ResourceMetricsChecker resMetricsUserSourceChecker = + ResourceMetricsChecker.create() + .gaugeLong(AVAILABLE_MB, 10 * GB) + .gaugeInt(AVAILABLE_V_CORES, 10) + .gaugeLong(PENDING_MB, 15 * GB) + .gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5) + .checkAgainst(userSource); + ResourceMetricsChecker resMetricsParentUserSourceChecker = + ResourceMetricsChecker.create() + .gaugeLong(AVAILABLE_MB, 10 * GB) + .gaugeInt(AVAILABLE_V_CORES, 10) + .gaugeLong(PENDING_MB, 15 * GB) + .gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5) + .checkAgainst(parentUserSource); metrics.runAppAttempt(app.getApplicationId(), user); - checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); - checkApps(userSource, 1, 0, 1, 0, 0, 0, true); + appMetricsQueueSourceChecker = + AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 1) + .checkAgainst(queueSource, true); + appMetricsUserSourceChecker = + AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 1) + .checkAgainst(userSource, true); metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, user, 3, Resources.createResource(2*GB, 2), true); @@ -318,32 +509,139 @@ public class TestQueueMetrics { user, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1); - checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1); - checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); - checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); + resMetricsQueueSourceChecker = + ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) + .gaugeLong(ALLOCATED_MB, 6 * GB) + .gaugeInt(ALLOCATED_V_CORES, 6) + .gaugeInt(ALLOCATED_CONTAINERS, 3) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) + .gaugeLong(PENDING_MB, 9 * GB) + .gaugeInt(PENDING_V_CORES, 9) + .gaugeInt(PENDING_CONTAINERS, 2) + .gaugeLong(RESERVED_MB, 3 * GB) + .gaugeInt(RESERVED_V_CORES, 3) + .gaugeInt(RESERVED_CONTAINERS, 1) + .checkAgainst(queueSource); + resMetricsParentQueueSourceChecker = + ResourceMetricsChecker + .createFromChecker(resMetricsParentQueueSourceChecker) + .gaugeLong(ALLOCATED_MB, 6 * GB) + .gaugeInt(ALLOCATED_V_CORES, 6) + .gaugeInt(ALLOCATED_CONTAINERS, 3) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) + .gaugeLong(PENDING_MB, 9 * GB) + .gaugeInt(PENDING_V_CORES, 9) + .gaugeInt(PENDING_CONTAINERS, 2) + .gaugeLong(RESERVED_MB, 3 * GB) + .gaugeInt(RESERVED_V_CORES, 3) + .gaugeInt(RESERVED_CONTAINERS, 1) + .checkAgainst(parentQueueSource); + resMetricsUserSourceChecker = + ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) + .gaugeLong(ALLOCATED_MB, 6 * GB) + .gaugeInt(ALLOCATED_V_CORES, 6) + .gaugeInt(ALLOCATED_CONTAINERS, 3) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) + .gaugeLong(PENDING_MB, 9 * GB) + .gaugeInt(PENDING_V_CORES, 9) + .gaugeInt(PENDING_CONTAINERS, 2) + .gaugeLong(RESERVED_MB, 3 * GB) + .gaugeInt(RESERVED_V_CORES, 3) + .gaugeInt(RESERVED_CONTAINERS, 1) + .checkAgainst(userSource); + resMetricsParentUserSourceChecker = ResourceMetricsChecker + .createFromChecker(resMetricsParentUserSourceChecker) + .gaugeLong(ALLOCATED_MB, 6 * GB) + .gaugeInt(ALLOCATED_V_CORES, 6) + .gaugeInt(ALLOCATED_CONTAINERS, 3) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 3) + .gaugeLong(PENDING_MB, 9 * GB) + .gaugeInt(PENDING_V_CORES, 9) + .gaugeInt(PENDING_CONTAINERS, 2) + .gaugeLong(RESERVED_MB, 3 * GB) + .gaugeInt(RESERVED_V_CORES, 3) + .gaugeInt(RESERVED_CONTAINERS, 1) + .checkAgainst(parentUserSource); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, user, 1, Resources.createResource(2*GB, 2)); metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL, user, Resources.createResource(3*GB, 3)); - checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); - checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); - checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); - checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); + ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) + .gaugeLong(ALLOCATED_MB, 4 * GB) + .gaugeInt(ALLOCATED_V_CORES, 4) + .gaugeInt(ALLOCATED_CONTAINERS, 2) + .counter(AGGREGATE_CONTAINERS_RELEASED, 1) + .gaugeLong(RESERVED_MB, 0) + .gaugeInt(RESERVED_V_CORES, 0) + .gaugeInt(RESERVED_CONTAINERS, 0) + .checkAgainst(queueSource); + ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker) + .gaugeLong(ALLOCATED_MB, 4 * GB) + .gaugeInt(ALLOCATED_V_CORES, 4) + .gaugeInt(ALLOCATED_CONTAINERS, 2) + .counter(AGGREGATE_CONTAINERS_RELEASED, 1) + .gaugeLong(RESERVED_MB, 0) + .gaugeInt(RESERVED_V_CORES, 0) + .gaugeInt(RESERVED_CONTAINERS, 0) + .checkAgainst(parentQueueSource); + ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) + .gaugeLong(ALLOCATED_MB, 4 * GB) + .gaugeInt(ALLOCATED_V_CORES, 4) + .gaugeInt(ALLOCATED_CONTAINERS, 2) + .counter(AGGREGATE_CONTAINERS_RELEASED, 1) + .gaugeLong(RESERVED_MB, 0) + .gaugeInt(RESERVED_V_CORES, 0) + .gaugeInt(RESERVED_CONTAINERS, 0) + .checkAgainst(userSource); + ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker) + .gaugeLong(ALLOCATED_MB, 4 * GB) + .gaugeInt(ALLOCATED_V_CORES, 4) + .gaugeInt(ALLOCATED_CONTAINERS, 2) + .counter(AGGREGATE_CONTAINERS_RELEASED, 1) + .gaugeLong(RESERVED_MB, 0) + .gaugeInt(RESERVED_V_CORES, 0) + .gaugeInt(RESERVED_CONTAINERS, 0) + .checkAgainst(parentUserSource); metrics.finishAppAttempt( app.getApplicationId(), app.isPending(), app.getUser()); - checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); - checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true); - checkApps(userSource, 1, 0, 0, 0, 0, 0, true); - checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true); + appMetricsQueueSourceChecker = AppMetricsChecker + .createFromChecker(appMetricsQueueSourceChecker) + .counter(APPS_SUBMITTED, 1) + .gaugeInt(APPS_RUNNING, 0) + .checkAgainst(queueSource, true); + appMetricsParentQueueSourceChecker = AppMetricsChecker + .createFromChecker(appMetricsParentQueueSourceChecker) + .counter(APPS_SUBMITTED, 1) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 0) + .checkAgainst(parentQueueSource, true); + appMetricsUserSourceChecker = AppMetricsChecker + .createFromChecker(appMetricsUserSourceChecker) + .counter(APPS_SUBMITTED, 1) + .gaugeInt(APPS_RUNNING, 0) + .checkAgainst(userSource, true); + appMetricsParentUserSourceChecker = AppMetricsChecker + .createFromChecker(appMetricsParentUserSourceChecker) + .counter(APPS_SUBMITTED, 1) + .gaugeInt(APPS_PENDING, 0) + .gaugeInt(APPS_RUNNING, 0) + .checkAgainst(parentUserSource, true); metrics.finishApp(user, RMAppState.FINISHED); - checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); - checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true); - checkApps(userSource, 1, 0, 0, 1, 0, 0, true); - checkApps(parentUserSource, 1, 0, 0, 1, 0, 0, true); + AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) + .counter(APPS_COMPLETED, 1) + .checkAgainst(queueSource, true); + AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker) + .counter(APPS_COMPLETED, 1) + .checkAgainst(parentQueueSource, true); + AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) + .counter(APPS_COMPLETED, 1) + .checkAgainst(userSource, true); + AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker) + .counter(APPS_COMPLETED, 1) + .checkAgainst(parentUserSource, true); } @Test @@ -383,8 +681,9 @@ public class TestQueueMetrics { FifoScheduler.class, ResourceScheduler.class); MockRM rm = new MockRM(conf); QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); - checkApps(metrics, 0, 0, 0, 0, 0, 0, true); - MetricsAsserts.assertGauge("ReservedContainers", 0, metrics); + AppMetricsChecker.create() + .checkAgainst(metrics, true); + MetricsAsserts.assertGauge(RESERVED_CONTAINERS.getValue(), 0, metrics); } // This is to test all metrics can consistently show up if specified true to @@ -396,52 +695,23 @@ public class TestQueueMetrics { QueueMetrics.forQueue(ms, queueName, null, false, conf); MetricsSource queueSource = queueSource(ms, queueName); - checkApps(queueSource, 0, 0, 0, 0, 0, 0, true); + AppMetricsChecker.create() + .checkAgainst(queueSource, true); try { // do not collect all metrics - checkApps(queueSource, 0, 0, 0, 0, 0, 0, false); + AppMetricsChecker.create() + .checkAgainst(queueSource, false); Assert.fail(); } catch (AssertionError e) { - Assert.assertTrue(e.getMessage().contains( - "Expected exactly one metric for name ")); + Assert.assertTrue( + e.getMessage().contains("Expected exactly one metric for name ")); } // collect all metrics - checkApps(queueSource, 0, 0, 0, 0, 0, 0, true); + AppMetricsChecker.create() + .checkAgainst(queueSource, true); } - public static void checkApps(MetricsSource source, int submitted, int pending, - int running, int completed, int failed, int killed, boolean all) { - MetricsRecordBuilder rb = getMetrics(source, all); - assertCounter("AppsSubmitted", submitted, rb); - assertGauge("AppsPending", pending, rb); - assertGauge("AppsRunning", running, rb); - assertCounter("AppsCompleted", completed, rb); - assertCounter("AppsFailed", failed, rb); - assertCounter("AppsKilled", killed, rb); - } - - public static void checkResources(MetricsSource source, long allocatedMB, - int allocatedCores, int allocCtnrs, long aggreAllocCtnrs, - long aggreReleasedCtnrs, long availableMB, int availableCores, long pendingMB, - int pendingCores, int pendingCtnrs, long reservedMB, int reservedCores, - int reservedCtnrs) { - MetricsRecordBuilder rb = getMetrics(source); - assertGauge("AllocatedMB", allocatedMB, rb); - assertGauge("AllocatedVCores", allocatedCores, rb); - assertGauge("AllocatedContainers", allocCtnrs, rb); - assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb); - assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb); - assertGauge("AvailableMB", availableMB, rb); - assertGauge("AvailableVCores", availableCores, rb); - assertGauge("PendingMB", pendingMB, rb); - assertGauge("PendingVCores", pendingCores, rb); - assertGauge("PendingContainers", pendingCtnrs, rb); - assertGauge("ReservedMB", reservedMB, rb); - assertGauge("ReservedVCores", reservedCores, rb); - assertGauge("ReservedContainers", reservedCtnrs, rb); - } - - public static void checkAggregatedNodeTypes(MetricsSource source, + private static void checkAggregatedNodeTypes(MetricsSource source, long nodeLocal, long rackLocal, long offSwitch) { MetricsRecordBuilder rb = getMetrics(source); assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb); @@ -459,14 +729,12 @@ public class TestQueueMetrics { } public static MetricsSource queueSource(MetricsSystem ms, String queue) { - MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).toString()); - return s; + return ms.getSource(QueueMetrics.sourceName(queue).toString()); } - public static MetricsSource userSource(MetricsSystem ms, String queue, - String user) { - MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue). + private static MetricsSource userSource(MetricsSystem ms, String queue, + String user) { + return ms.getSource(QueueMetrics.sourceName(queue). append(",user=").append(user).toString()); - return s; } }