YARN-8750. Refactor TestQueueMetrics. (Contributed by Szilard Nemeth)

This commit is contained in:
Haibo Chen 2018-10-04 13:00:31 -07:00
parent b6d5d84e07
commit e60b797c88
3 changed files with 701 additions and 141 deletions

View File

@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import com.google.common.collect.Maps;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_KILLED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED;
final class AppMetricsChecker {
private final static Logger LOG =
LoggerFactory.getLogger(AppMetricsChecker.class);
private static final AppMetricsChecker INITIAL_CHECKER =
new AppMetricsChecker()
.counter(APPS_SUBMITTED, 0)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0)
.counter(APPS_COMPLETED, 0)
.counter(APPS_FAILED, 0)
.counter(APPS_KILLED, 0);
enum AppMetricsKey {
APPS_SUBMITTED("AppsSubmitted"),
APPS_PENDING("AppsPending"),
APPS_RUNNING("AppsRunning"),
APPS_COMPLETED("AppsCompleted"),
APPS_FAILED("AppsFailed"),
APPS_KILLED("AppsKilled");
private String value;
AppMetricsKey(String value) {
this.value = value;
}
}
private final Map<AppMetricsKey, Integer> gaugesInt;
private final Map<AppMetricsKey, Integer> counters;
private AppMetricsChecker() {
this.gaugesInt = Maps.newHashMap();
this.counters = Maps.newHashMap();
}
private AppMetricsChecker(AppMetricsChecker checker) {
this.gaugesInt = Maps.newHashMap(checker.gaugesInt);
this.counters = Maps.newHashMap(checker.counters);
}
public static AppMetricsChecker createFromChecker(AppMetricsChecker checker) {
return new AppMetricsChecker(checker);
}
public static AppMetricsChecker create() {
return new AppMetricsChecker(INITIAL_CHECKER);
}
AppMetricsChecker gaugeInt(AppMetricsKey key, int value) {
gaugesInt.put(key, value);
return this;
}
AppMetricsChecker counter(AppMetricsKey key, int value) {
counters.put(key, value);
return this;
}
AppMetricsChecker checkAgainst(MetricsSource source, boolean all) {
if (source == null) {
throw new IllegalStateException(
"MetricsSource should not be null!");
}
MetricsRecordBuilder recordBuilder = getMetrics(source, all);
logAssertingMessage(source);
for (Map.Entry<AppMetricsKey, Integer> gauge : gaugesInt.entrySet()) {
assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder);
}
for (Map.Entry<AppMetricsKey, Integer> counter : counters.entrySet()) {
assertCounter(counter.getKey().value, counter.getValue(), recordBuilder);
}
return this;
}
private void logAssertingMessage(MetricsSource source) {
String queueName = ((QueueMetrics) source).queueName;
Map<String, QueueMetrics> users = ((QueueMetrics) source).users;
if (LOG.isDebugEnabled()) {
LOG.debug("Asserting App metrics.. QueueName: " + queueName + ", users: "
+ (users != null && !users.isEmpty() ? users : ""));
}
}
}

View File

@ -0,0 +1,170 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import com.google.common.collect.Maps;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
final class ResourceMetricsChecker {
private final static Logger LOG =
LoggerFactory.getLogger(ResourceMetricsChecker.class);
private static final ResourceMetricsChecker INITIAL_CHECKER =
new ResourceMetricsChecker()
.gaugeLong(ALLOCATED_MB, 0)
.gaugeInt(ALLOCATED_V_CORES, 0)
.gaugeInt(ALLOCATED_CONTAINERS, 0)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 0)
.counter(AGGREGATE_CONTAINERS_RELEASED, 0)
.gaugeLong(AVAILABLE_MB, 0)
.gaugeInt(AVAILABLE_V_CORES, 0)
.gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0)
.gaugeInt(PENDING_CONTAINERS, 0)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0);
enum ResourceMetricsKey {
ALLOCATED_MB("AllocatedMB"),
ALLOCATED_V_CORES("AllocatedVCores"),
ALLOCATED_CONTAINERS("AllocatedContainers"),
AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated"),
AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased"),
AVAILABLE_MB("AvailableMB"),
AVAILABLE_V_CORES("AvailableVCores"),
PENDING_MB("PendingMB"),
PENDING_V_CORES("PendingVCores"),
PENDING_CONTAINERS("PendingContainers"),
RESERVED_MB("ReservedMB"),
RESERVED_V_CORES("ReservedVCores"),
RESERVED_CONTAINERS("ReservedContainers");
private String value;
ResourceMetricsKey(String value) {
this.value = value;
}
public String getValue() {
return value;
}
}
private final Map<ResourceMetricsKey, Long> gaugesLong;
private final Map<ResourceMetricsKey, Integer> gaugesInt;
private final Map<ResourceMetricsKey, Long> counters;
private ResourceMetricsChecker() {
this.gaugesLong = Maps.newHashMap();
this.gaugesInt = Maps.newHashMap();
this.counters = Maps.newHashMap();
}
private ResourceMetricsChecker(ResourceMetricsChecker checker) {
this.gaugesLong = Maps.newHashMap(checker.gaugesLong);
this.gaugesInt = Maps.newHashMap(checker.gaugesInt);
this.counters = Maps.newHashMap(checker.counters);
}
public static ResourceMetricsChecker createFromChecker(
ResourceMetricsChecker checker) {
return new ResourceMetricsChecker(checker);
}
public static ResourceMetricsChecker create() {
return new ResourceMetricsChecker(INITIAL_CHECKER);
}
ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) {
gaugesLong.put(key, value);
return this;
}
ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) {
gaugesInt.put(key, value);
return this;
}
ResourceMetricsChecker counter(ResourceMetricsKey key, long value) {
counters.put(key, value);
return this;
}
ResourceMetricsChecker checkAgainst(MetricsSource source) {
if (source == null) {
throw new IllegalStateException("MetricsSource should not be null!");
}
MetricsRecordBuilder recordBuilder = getMetrics(source);
logAssertingMessage(source);
for (Map.Entry<ResourceMetricsKey, Long> gauge : gaugesLong.entrySet()) {
assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder);
}
for (Map.Entry<ResourceMetricsKey, Integer> gauge : gaugesInt.entrySet()) {
assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder);
}
for (Map.Entry<ResourceMetricsKey, Long> counter : counters.entrySet()) {
assertCounter(counter.getKey().value, counter.getValue(), recordBuilder);
}
return this;
}
private void logAssertingMessage(MetricsSource source) {
String queueName = ((QueueMetrics) source).queueName;
Map<String, QueueMetrics> users = ((QueueMetrics) source).users;
if (LOG.isDebugEnabled()) {
LOG.debug("Asserting Resource metrics.. QueueName: " + queueName
+ ", users: " + (users != null && !users.isEmpty() ? users : ""));
}
}
}

View File

@ -19,8 +19,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.AppMetricsChecker.AppMetricsKey.*;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.*;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -45,9 +47,8 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestQueueMetrics { public class TestQueueMetrics {
static final int GB = 1024; // MB private static final int GB = 1024; // MB
private static final Configuration conf = new Configuration(); private static final Configuration conf = new Configuration();
private MetricsSystem ms; private MetricsSystem ms;
@Before @Before
@ -56,7 +57,8 @@ public class TestQueueMetrics {
QueueMetrics.clearQueueMetrics(); QueueMetrics.clearQueueMetrics();
} }
@Test public void testDefaultSingleQueueMetrics() { @Test
public void testDefaultSingleQueueMetrics() {
String queueName = "single"; String queueName = "single";
String user = "alice"; String user = "alice";
@ -67,9 +69,13 @@ public class TestQueueMetrics {
metrics.submitApp(user); metrics.submitApp(user);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
metrics.submitAppAttempt(user); metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100)); Resources.createResource(100*GB, 100));
@ -77,34 +83,63 @@ public class TestQueueMetrics {
user, 5, Resources.createResource(3*GB, 3)); user, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 100 * GB)
.gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true); user, 3, Resources.createResource(2*GB, 2), true);
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.checkAgainst(queueSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2)); user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.checkAgainst(queueSource);
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 0, Resources.createResource(2 * GB, 2)); user, 0, Resources.createResource(2 * GB, 2));
checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2, //nothing should change in values
0, 0, 0); rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.checkAgainst(queueSource);
metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL, metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 0, Resources.createResource(2 * GB, 2)); user, 0, Resources.createResource(2 * GB, 2));
checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2, //nothing should change in values
0, 0, 0); ResourceMetricsChecker.createFromChecker(rmChecker)
.checkAgainst(queueSource);
metrics.finishAppAttempt( metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser()); app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
metrics.finishApp(user, RMAppState.FINISHED); metrics.finishApp(user, RMAppState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); AppMetricsChecker.createFromChecker(appMetricsChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
assertNull(userSource); assertNull(userSource);
} }
@ -120,50 +155,77 @@ public class TestQueueMetrics {
metrics.submitApp(user); metrics.submitApp(user);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
metrics.submitAppAttempt(user); metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
metrics.finishAppAttempt( metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser()); app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
// As the application has failed, framework retries the same application // As the application has failed, framework retries the same application
// based on configuration // based on configuration
metrics.submitAppAttempt(user); metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
// Suppose say application has failed this time as well. // Suppose say application has failed this time as well.
metrics.finishAppAttempt( metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser()); app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
// As the application has failed, framework retries the same application // As the application has failed, framework retries the same application
// based on configuration // based on configuration
metrics.submitAppAttempt(user); metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
// Suppose say application has failed, and there's no more retries. // Suppose say application has failed, and there's no more retries.
metrics.finishAppAttempt( metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser()); app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
metrics.finishApp(user, RMAppState.FAILED); metrics.finishApp(user, RMAppState.FAILED);
checkApps(queueSource, 1, 0, 0, 0, 1, 0, true); AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0)
.counter(APPS_FAILED, 1)
.checkAgainst(queueSource, true);
assertNull(userSource); assertNull(userSource);
} }
@Test public void testSingleQueueWithUserMetrics() { @Test
public void testSingleQueueWithUserMetrics() {
String queueName = "single2"; String queueName = "single2";
String user = "dodo"; String user = "dodo";
@ -175,12 +237,22 @@ public class TestQueueMetrics {
metrics.submitApp(user); metrics.submitApp(user);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, user);
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
checkApps(userSource, 1, 0, 0, 0, 0, 0, true); .counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true);
AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(userSource, true);
metrics.submitAppAttempt(user); metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); appMetricsQueueSourceChecker = AppMetricsChecker
checkApps(userSource, 1, 1, 0, 0, 0, 0, true); .createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true);
appMetricsUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(userSource, true);
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100)); Resources.createResource(100*GB, 100));
@ -188,36 +260,97 @@ public class TestQueueMetrics {
user, Resources.createResource(10*GB, 10)); user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3)); user, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); ResourceMetricsChecker resMetricsQueueSourceChecker =
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 100 * GB)
.gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource);
ResourceMetricsChecker resMetricsUserSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB)
.gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(userSource);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); appMetricsQueueSourceChecker = AppMetricsChecker
checkApps(userSource, 1, 0, 1, 0, 0, 0, true); .createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
appMetricsUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(userSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true); user, 3, Resources.createResource(2*GB, 2), true);
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); resMetricsQueueSourceChecker =
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.checkAgainst(queueSource);
resMetricsUserSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.checkAgainst(userSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2)); user, 1, Resources.createResource(2*GB, 2));
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); .gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.checkAgainst(queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.checkAgainst(userSource);
metrics.finishAppAttempt( metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser()); app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); appMetricsQueueSourceChecker =
checkApps(userSource, 1, 0, 0, 0, 0, 0, true); AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(userSource, true);
metrics.finishApp(user, RMAppState.FINISHED); metrics.finishApp(user, RMAppState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
checkApps(userSource, 1, 0, 0, 1, 0, 0, true); .counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true);
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(userSource, true);
} }
@Test
@Test public void testNodeTypeMetrics() { public void testNodeTypeMetrics() {
String parentQueueName = "root"; String parentQueueName = "root";
String leafQueueName = "root.leaf"; String leafQueueName = "root.leaf";
String user = "alice"; String user = "alice";
@ -237,33 +370,32 @@ public class TestQueueMetrics {
MetricsSource parentUserSource = userSource(ms, parentQueueName, user); MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL); metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL);
checkAggregatedNodeTypes(queueSource,1L,0L,0L); checkAggregatedNodeTypes(queueSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(parentQueueSource,1L,0L,0L); checkAggregatedNodeTypes(parentQueueSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(userSource,1L,0L,0L); checkAggregatedNodeTypes(userSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(parentUserSource,1L,0L,0L); checkAggregatedNodeTypes(parentUserSource, 1L, 0L, 0L);
metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL); metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL);
checkAggregatedNodeTypes(queueSource,1L,1L,0L); checkAggregatedNodeTypes(queueSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(parentQueueSource,1L,1L,0L); checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(userSource,1L,1L,0L); checkAggregatedNodeTypes(userSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(parentUserSource,1L,1L,0L); checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 0L);
metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource,1L,1L,1L); checkAggregatedNodeTypes(queueSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(parentQueueSource,1L,1L,1L); checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(userSource,1L,1L,1L); checkAggregatedNodeTypes(userSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(parentUserSource,1L,1L,1L); checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 1L);
metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource,1L,1L,2L); checkAggregatedNodeTypes(queueSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(parentQueueSource,1L,1L,2L); checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(userSource,1L,1L,2L); checkAggregatedNodeTypes(userSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(parentUserSource,1L,1L,2L); checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 2L);
} }
@Test
@Test public void testTwoLevelWithUserMetrics() { public void testTwoLevelWithUserMetrics() {
String parentQueueName = "root"; String parentQueueName = "root";
String leafQueueName = "root.leaf"; String leafQueueName = "root.leaf";
String user = "alice"; String user = "alice";
@ -282,16 +414,38 @@ public class TestQueueMetrics {
MetricsSource userSource = userSource(ms, leafQueueName, user); MetricsSource userSource = userSource(ms, leafQueueName, user);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user); MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true); .counter(APPS_SUBMITTED, 1)
checkApps(userSource, 1, 0, 0, 0, 0, 0, true); .checkAgainst(queueSource, true);
checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true); AppMetricsChecker appMetricsParentQueueSourceChecker =
AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(parentQueueSource, true);
AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(userSource, true);
AppMetricsChecker appMetricsParentUserSourceChecker =
AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1)
.checkAgainst(parentUserSource, true);
metrics.submitAppAttempt(user); metrics.submitAppAttempt(user);
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); appMetricsQueueSourceChecker =
checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true); AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
checkApps(userSource, 1, 1, 0, 0, 0, 0, true); .gaugeInt(APPS_PENDING, 1)
checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true); .checkAgainst(queueSource, true);
appMetricsParentQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(parentQueueSource, true);
appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(userSource, true);
appMetricsParentUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
.gaugeInt(APPS_PENDING, 1)
.checkAgainst(parentUserSource, true);
parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100)); Resources.createResource(100*GB, 100));
@ -303,14 +457,51 @@ public class TestQueueMetrics {
user, Resources.createResource(10*GB, 10)); user, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3)); user, 5, Resources.createResource(3*GB, 3));
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); ResourceMetricsChecker resMetricsQueueSourceChecker =
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); ResourceMetricsChecker.create()
checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); .gaugeLong(AVAILABLE_MB, 100 * GB)
.gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource);
ResourceMetricsChecker resMetricsParentQueueSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 100 * GB)
.gaugeInt(AVAILABLE_V_CORES, 100)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(parentQueueSource);
ResourceMetricsChecker resMetricsUserSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB)
.gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(userSource);
ResourceMetricsChecker resMetricsParentUserSourceChecker =
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB)
.gaugeInt(AVAILABLE_V_CORES, 10)
.gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(parentUserSource);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), user);
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); appMetricsQueueSourceChecker =
checkApps(userSource, 1, 0, 1, 0, 0, 0, true); AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true);
appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1)
.checkAgainst(userSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true); user, 3, Resources.createResource(2*GB, 2), true);
@ -318,32 +509,139 @@ public class TestQueueMetrics {
user, Resources.createResource(3*GB, 3)); user, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1); resMetricsQueueSourceChecker =
checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1); ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); .gaugeLong(ALLOCATED_MB, 6 * GB)
checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); .gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(queueSource);
resMetricsParentQueueSourceChecker =
ResourceMetricsChecker
.createFromChecker(resMetricsParentQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(parentQueueSource);
resMetricsUserSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(userSource);
resMetricsParentUserSourceChecker = ResourceMetricsChecker
.createFromChecker(resMetricsParentUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6)
.gaugeInt(ALLOCATED_CONTAINERS, 3)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
.gaugeLong(PENDING_MB, 9 * GB)
.gaugeInt(PENDING_V_CORES, 9)
.gaugeInt(PENDING_CONTAINERS, 2)
.gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(parentUserSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2)); user, 1, Resources.createResource(2*GB, 2));
metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL, metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(3*GB, 3)); user, Resources.createResource(3*GB, 3));
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); .gaugeLong(ALLOCATED_MB, 4 * GB)
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); .gaugeInt(ALLOCATED_V_CORES, 4)
checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); .gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(parentQueueSource);
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(userSource);
ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4)
.gaugeInt(ALLOCATED_CONTAINERS, 2)
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(parentUserSource);
metrics.finishAppAttempt( metrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser()); app.getApplicationId(), app.isPending(), app.getUser());
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true); appMetricsQueueSourceChecker = AppMetricsChecker
checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true); .createFromChecker(appMetricsQueueSourceChecker)
checkApps(userSource, 1, 0, 0, 0, 0, 0, true); .counter(APPS_SUBMITTED, 1)
checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true); .gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true);
appMetricsParentQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsParentQueueSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(parentQueueSource, true);
appMetricsUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(userSource, true);
appMetricsParentUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsParentUserSourceChecker)
.counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0)
.checkAgainst(parentUserSource, true);
metrics.finishApp(user, RMAppState.FINISHED); metrics.finishApp(user, RMAppState.FINISHED);
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true); AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true); .counter(APPS_COMPLETED, 1)
checkApps(userSource, 1, 0, 0, 1, 0, 0, true); .checkAgainst(queueSource, true);
checkApps(parentUserSource, 1, 0, 0, 1, 0, 0, true); AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(parentQueueSource, true);
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(userSource, true);
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
.counter(APPS_COMPLETED, 1)
.checkAgainst(parentUserSource, true);
} }
@Test @Test
@ -383,8 +681,9 @@ public class TestQueueMetrics {
FifoScheduler.class, ResourceScheduler.class); FifoScheduler.class, ResourceScheduler.class);
MockRM rm = new MockRM(conf); MockRM rm = new MockRM(conf);
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
checkApps(metrics, 0, 0, 0, 0, 0, 0, true); AppMetricsChecker.create()
MetricsAsserts.assertGauge("ReservedContainers", 0, metrics); .checkAgainst(metrics, true);
MetricsAsserts.assertGauge(RESERVED_CONTAINERS.getValue(), 0, metrics);
} }
// This is to test all metrics can consistently show up if specified true to // This is to test all metrics can consistently show up if specified true to
@ -396,52 +695,23 @@ public class TestQueueMetrics {
QueueMetrics.forQueue(ms, queueName, null, false, conf); QueueMetrics.forQueue(ms, queueName, null, false, conf);
MetricsSource queueSource = queueSource(ms, queueName); MetricsSource queueSource = queueSource(ms, queueName);
checkApps(queueSource, 0, 0, 0, 0, 0, 0, true); AppMetricsChecker.create()
.checkAgainst(queueSource, true);
try { try {
// do not collect all metrics // do not collect all metrics
checkApps(queueSource, 0, 0, 0, 0, 0, 0, false); AppMetricsChecker.create()
.checkAgainst(queueSource, false);
Assert.fail(); Assert.fail();
} catch (AssertionError e) { } catch (AssertionError e) {
Assert.assertTrue(e.getMessage().contains( Assert.assertTrue(
"Expected exactly one metric for name ")); e.getMessage().contains("Expected exactly one metric for name "));
} }
// collect all metrics // collect all metrics
checkApps(queueSource, 0, 0, 0, 0, 0, 0, true); AppMetricsChecker.create()
.checkAgainst(queueSource, true);
} }
public static void checkApps(MetricsSource source, int submitted, int pending, private static void checkAggregatedNodeTypes(MetricsSource source,
int running, int completed, int failed, int killed, boolean all) {
MetricsRecordBuilder rb = getMetrics(source, all);
assertCounter("AppsSubmitted", submitted, rb);
assertGauge("AppsPending", pending, rb);
assertGauge("AppsRunning", running, rb);
assertCounter("AppsCompleted", completed, rb);
assertCounter("AppsFailed", failed, rb);
assertCounter("AppsKilled", killed, rb);
}
public static void checkResources(MetricsSource source, long allocatedMB,
int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
long aggreReleasedCtnrs, long availableMB, int availableCores, long pendingMB,
int pendingCores, int pendingCtnrs, long reservedMB, int reservedCores,
int reservedCtnrs) {
MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb);
assertGauge("AllocatedVCores", allocatedCores, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb);
assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
assertGauge("AvailableMB", availableMB, rb);
assertGauge("AvailableVCores", availableCores, rb);
assertGauge("PendingMB", pendingMB, rb);
assertGauge("PendingVCores", pendingCores, rb);
assertGauge("PendingContainers", pendingCtnrs, rb);
assertGauge("ReservedMB", reservedMB, rb);
assertGauge("ReservedVCores", reservedCores, rb);
assertGauge("ReservedContainers", reservedCtnrs, rb);
}
public static void checkAggregatedNodeTypes(MetricsSource source,
long nodeLocal, long rackLocal, long offSwitch) { long nodeLocal, long rackLocal, long offSwitch) {
MetricsRecordBuilder rb = getMetrics(source); MetricsRecordBuilder rb = getMetrics(source);
assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb); assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb);
@ -459,14 +729,12 @@ public class TestQueueMetrics {
} }
public static MetricsSource queueSource(MetricsSystem ms, String queue) { public static MetricsSource queueSource(MetricsSystem ms, String queue) {
MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).toString()); return ms.getSource(QueueMetrics.sourceName(queue).toString());
return s;
} }
public static MetricsSource userSource(MetricsSystem ms, String queue, private static MetricsSource userSource(MetricsSystem ms, String queue,
String user) { String user) {
MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue). return ms.getSource(QueueMetrics.sourceName(queue).
append(",user=").append(user).toString()); append(",user=").append(user).toString());
return s;
} }
} }