YARN-8750. Refactor TestQueueMetrics. (Contributed by Szilard Nemeth)
(cherry picked from commit e60b797c88
)
This commit is contained in:
parent
b10fdd136a
commit
1344823d4d
|
@ -0,0 +1,122 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_KILLED;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED;
|
||||||
|
|
||||||
|
final class AppMetricsChecker {
|
||||||
|
private final static Logger LOG =
|
||||||
|
LoggerFactory.getLogger(AppMetricsChecker.class);
|
||||||
|
|
||||||
|
private static final AppMetricsChecker INITIAL_CHECKER =
|
||||||
|
new AppMetricsChecker()
|
||||||
|
.counter(APPS_SUBMITTED, 0)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.counter(APPS_COMPLETED, 0)
|
||||||
|
.counter(APPS_FAILED, 0)
|
||||||
|
.counter(APPS_KILLED, 0);
|
||||||
|
|
||||||
|
enum AppMetricsKey {
|
||||||
|
APPS_SUBMITTED("AppsSubmitted"),
|
||||||
|
APPS_PENDING("AppsPending"),
|
||||||
|
APPS_RUNNING("AppsRunning"),
|
||||||
|
APPS_COMPLETED("AppsCompleted"),
|
||||||
|
APPS_FAILED("AppsFailed"),
|
||||||
|
APPS_KILLED("AppsKilled");
|
||||||
|
|
||||||
|
private String value;
|
||||||
|
|
||||||
|
AppMetricsKey(String value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
private final Map<AppMetricsKey, Integer> gaugesInt;
|
||||||
|
private final Map<AppMetricsKey, Integer> counters;
|
||||||
|
|
||||||
|
private AppMetricsChecker() {
|
||||||
|
this.gaugesInt = Maps.newHashMap();
|
||||||
|
this.counters = Maps.newHashMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
private AppMetricsChecker(AppMetricsChecker checker) {
|
||||||
|
this.gaugesInt = Maps.newHashMap(checker.gaugesInt);
|
||||||
|
this.counters = Maps.newHashMap(checker.counters);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static AppMetricsChecker createFromChecker(AppMetricsChecker checker) {
|
||||||
|
return new AppMetricsChecker(checker);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static AppMetricsChecker create() {
|
||||||
|
return new AppMetricsChecker(INITIAL_CHECKER);
|
||||||
|
}
|
||||||
|
|
||||||
|
AppMetricsChecker gaugeInt(AppMetricsKey key, int value) {
|
||||||
|
gaugesInt.put(key, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
AppMetricsChecker counter(AppMetricsKey key, int value) {
|
||||||
|
counters.put(key, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
AppMetricsChecker checkAgainst(MetricsSource source, boolean all) {
|
||||||
|
if (source == null) {
|
||||||
|
throw new IllegalStateException(
|
||||||
|
"MetricsSource should not be null!");
|
||||||
|
}
|
||||||
|
MetricsRecordBuilder recordBuilder = getMetrics(source, all);
|
||||||
|
logAssertingMessage(source);
|
||||||
|
|
||||||
|
for (Map.Entry<AppMetricsKey, Integer> gauge : gaugesInt.entrySet()) {
|
||||||
|
assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Map.Entry<AppMetricsKey, Integer> counter : counters.entrySet()) {
|
||||||
|
assertCounter(counter.getKey().value, counter.getValue(), recordBuilder);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void logAssertingMessage(MetricsSource source) {
|
||||||
|
String queueName = ((QueueMetrics) source).queueName;
|
||||||
|
Map<String, QueueMetrics> users = ((QueueMetrics) source).users;
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Asserting App metrics.. QueueName: " + queueName + ", users: "
|
||||||
|
+ (users != null && !users.isEmpty() ? users : ""));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,170 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
|
import org.apache.hadoop.metrics2.MetricsSource;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
||||||
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
|
||||||
|
|
||||||
|
final class ResourceMetricsChecker {
|
||||||
|
private final static Logger LOG =
|
||||||
|
LoggerFactory.getLogger(ResourceMetricsChecker.class);
|
||||||
|
|
||||||
|
private static final ResourceMetricsChecker INITIAL_CHECKER =
|
||||||
|
new ResourceMetricsChecker()
|
||||||
|
.gaugeLong(ALLOCATED_MB, 0)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 0)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 0)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 0)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_RELEASED, 0)
|
||||||
|
.gaugeLong(AVAILABLE_MB, 0)
|
||||||
|
.gaugeInt(AVAILABLE_V_CORES, 0)
|
||||||
|
.gaugeLong(PENDING_MB, 0)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 0)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 0)
|
||||||
|
.gaugeLong(RESERVED_MB, 0)
|
||||||
|
.gaugeInt(RESERVED_V_CORES, 0)
|
||||||
|
.gaugeInt(RESERVED_CONTAINERS, 0);
|
||||||
|
|
||||||
|
enum ResourceMetricsKey {
|
||||||
|
ALLOCATED_MB("AllocatedMB"),
|
||||||
|
ALLOCATED_V_CORES("AllocatedVCores"),
|
||||||
|
ALLOCATED_CONTAINERS("AllocatedContainers"),
|
||||||
|
AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated"),
|
||||||
|
AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased"),
|
||||||
|
AVAILABLE_MB("AvailableMB"),
|
||||||
|
AVAILABLE_V_CORES("AvailableVCores"),
|
||||||
|
PENDING_MB("PendingMB"),
|
||||||
|
PENDING_V_CORES("PendingVCores"),
|
||||||
|
PENDING_CONTAINERS("PendingContainers"),
|
||||||
|
RESERVED_MB("ReservedMB"),
|
||||||
|
RESERVED_V_CORES("ReservedVCores"),
|
||||||
|
RESERVED_CONTAINERS("ReservedContainers");
|
||||||
|
|
||||||
|
private String value;
|
||||||
|
|
||||||
|
ResourceMetricsKey(String value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private final Map<ResourceMetricsKey, Long> gaugesLong;
|
||||||
|
private final Map<ResourceMetricsKey, Integer> gaugesInt;
|
||||||
|
private final Map<ResourceMetricsKey, Long> counters;
|
||||||
|
|
||||||
|
private ResourceMetricsChecker() {
|
||||||
|
this.gaugesLong = Maps.newHashMap();
|
||||||
|
this.gaugesInt = Maps.newHashMap();
|
||||||
|
this.counters = Maps.newHashMap();
|
||||||
|
}
|
||||||
|
|
||||||
|
private ResourceMetricsChecker(ResourceMetricsChecker checker) {
|
||||||
|
this.gaugesLong = Maps.newHashMap(checker.gaugesLong);
|
||||||
|
this.gaugesInt = Maps.newHashMap(checker.gaugesInt);
|
||||||
|
this.counters = Maps.newHashMap(checker.counters);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ResourceMetricsChecker createFromChecker(
|
||||||
|
ResourceMetricsChecker checker) {
|
||||||
|
return new ResourceMetricsChecker(checker);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static ResourceMetricsChecker create() {
|
||||||
|
return new ResourceMetricsChecker(INITIAL_CHECKER);
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) {
|
||||||
|
gaugesLong.put(key, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) {
|
||||||
|
gaugesInt.put(key, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceMetricsChecker counter(ResourceMetricsKey key, long value) {
|
||||||
|
counters.put(key, value);
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
ResourceMetricsChecker checkAgainst(MetricsSource source) {
|
||||||
|
if (source == null) {
|
||||||
|
throw new IllegalStateException("MetricsSource should not be null!");
|
||||||
|
}
|
||||||
|
MetricsRecordBuilder recordBuilder = getMetrics(source);
|
||||||
|
logAssertingMessage(source);
|
||||||
|
|
||||||
|
for (Map.Entry<ResourceMetricsKey, Long> gauge : gaugesLong.entrySet()) {
|
||||||
|
assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Map.Entry<ResourceMetricsKey, Integer> gauge : gaugesInt.entrySet()) {
|
||||||
|
assertGauge(gauge.getKey().value, gauge.getValue(), recordBuilder);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Map.Entry<ResourceMetricsKey, Long> counter : counters.entrySet()) {
|
||||||
|
assertCounter(counter.getKey().value, counter.getValue(), recordBuilder);
|
||||||
|
}
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void logAssertingMessage(MetricsSource source) {
|
||||||
|
String queueName = ((QueueMetrics) source).queueName;
|
||||||
|
Map<String, QueueMetrics> users = ((QueueMetrics) source).users;
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Asserting Resource metrics.. QueueName: " + queueName
|
||||||
|
+ ", users: " + (users != null && !users.isEmpty() ? users : ""));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -19,8 +19,10 @@
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
|
|
||||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||||
|
.AppMetricsChecker.AppMetricsKey.*;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.*;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
@ -45,9 +47,8 @@ import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestQueueMetrics {
|
public class TestQueueMetrics {
|
||||||
static final int GB = 1024; // MB
|
private static final int GB = 1024; // MB
|
||||||
private static final Configuration conf = new Configuration();
|
private static final Configuration conf = new Configuration();
|
||||||
|
|
||||||
private MetricsSystem ms;
|
private MetricsSystem ms;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
|
@ -56,7 +57,8 @@ public class TestQueueMetrics {
|
||||||
QueueMetrics.clearQueueMetrics();
|
QueueMetrics.clearQueueMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void testDefaultSingleQueueMetrics() {
|
@Test
|
||||||
|
public void testDefaultSingleQueueMetrics() {
|
||||||
String queueName = "single";
|
String queueName = "single";
|
||||||
String user = "alice";
|
String user = "alice";
|
||||||
|
|
||||||
|
@ -67,9 +69,13 @@ public class TestQueueMetrics {
|
||||||
|
|
||||||
metrics.submitApp(user);
|
metrics.submitApp(user);
|
||||||
MetricsSource userSource = userSource(ms, queueName, user);
|
MetricsSource userSource = userSource(ms, queueName, user);
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
|
||||||
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
metrics.submitAppAttempt(user);
|
metrics.submitAppAttempt(user);
|
||||||
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
|
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
|
||||||
Resources.createResource(100*GB, 100));
|
Resources.createResource(100*GB, 100));
|
||||||
|
@ -77,34 +83,63 @@ public class TestQueueMetrics {
|
||||||
user, 5, Resources.createResource(3*GB, 3));
|
user, 5, Resources.createResource(3*GB, 3));
|
||||||
// Available resources is set externally, as it depends on dynamic
|
// Available resources is set externally, as it depends on dynamic
|
||||||
// configurable cluster/queue resources
|
// configurable cluster/queue resources
|
||||||
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create()
|
||||||
|
.gaugeLong(AVAILABLE_MB, 100 * GB)
|
||||||
|
.gaugeInt(AVAILABLE_V_CORES, 100)
|
||||||
|
.gaugeLong(PENDING_MB, 15 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 15)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 5)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
|
||||||
metrics.runAppAttempt(app.getApplicationId(), user);
|
metrics.runAppAttempt(app.getApplicationId(), user);
|
||||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
|
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, 3, Resources.createResource(2*GB, 2), true);
|
user, 3, Resources.createResource(2*GB, 2), true);
|
||||||
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 6 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 6)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 3)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
|
||||||
|
.gaugeLong(PENDING_MB, 9 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 9)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 2)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
|
||||||
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
|
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, 1, Resources.createResource(2*GB, 2));
|
user, 1, Resources.createResource(2*GB, 2));
|
||||||
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 4 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 4)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 2)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
|
||||||
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
|
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, 0, Resources.createResource(2 * GB, 2));
|
user, 0, Resources.createResource(2 * GB, 2));
|
||||||
checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
|
//nothing should change in values
|
||||||
0, 0, 0);
|
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
|
||||||
metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
|
metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, 0, Resources.createResource(2 * GB, 2));
|
user, 0, Resources.createResource(2 * GB, 2));
|
||||||
checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2,
|
//nothing should change in values
|
||||||
0, 0, 0);
|
ResourceMetricsChecker.createFromChecker(rmChecker)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
|
||||||
metrics.finishAppAttempt(
|
metrics.finishAppAttempt(
|
||||||
app.getApplicationId(), app.isPending(), app.getUser());
|
app.getApplicationId(), app.isPending(), app.getUser());
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
metrics.finishApp(user, RMAppState.FINISHED);
|
metrics.finishApp(user, RMAppState.FINISHED);
|
||||||
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
|
AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.counter(APPS_COMPLETED, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
assertNull(userSource);
|
assertNull(userSource);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,50 +155,77 @@ public class TestQueueMetrics {
|
||||||
|
|
||||||
metrics.submitApp(user);
|
metrics.submitApp(user);
|
||||||
MetricsSource userSource = userSource(ms, queueName, user);
|
MetricsSource userSource = userSource(ms, queueName, user);
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
|
||||||
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
metrics.submitAppAttempt(user);
|
metrics.submitAppAttempt(user);
|
||||||
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
metrics.runAppAttempt(app.getApplicationId(), user);
|
metrics.runAppAttempt(app.getApplicationId(), user);
|
||||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
metrics.finishAppAttempt(
|
metrics.finishAppAttempt(
|
||||||
app.getApplicationId(), app.isPending(), app.getUser());
|
app.getApplicationId(), app.isPending(), app.getUser());
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
// As the application has failed, framework retries the same application
|
// As the application has failed, framework retries the same application
|
||||||
// based on configuration
|
// based on configuration
|
||||||
metrics.submitAppAttempt(user);
|
metrics.submitAppAttempt(user);
|
||||||
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
metrics.runAppAttempt(app.getApplicationId(), user);
|
metrics.runAppAttempt(app.getApplicationId(), user);
|
||||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
// Suppose say application has failed this time as well.
|
// Suppose say application has failed this time as well.
|
||||||
metrics.finishAppAttempt(
|
metrics.finishAppAttempt(
|
||||||
app.getApplicationId(), app.isPending(), app.getUser());
|
app.getApplicationId(), app.isPending(), app.getUser());
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
// As the application has failed, framework retries the same application
|
// As the application has failed, framework retries the same application
|
||||||
// based on configuration
|
// based on configuration
|
||||||
metrics.submitAppAttempt(user);
|
metrics.submitAppAttempt(user);
|
||||||
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
metrics.runAppAttempt(app.getApplicationId(), user);
|
metrics.runAppAttempt(app.getApplicationId(), user);
|
||||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
// Suppose say application has failed, and there's no more retries.
|
// Suppose say application has failed, and there's no more retries.
|
||||||
metrics.finishAppAttempt(
|
metrics.finishAppAttempt(
|
||||||
app.getApplicationId(), app.isPending(), app.getUser());
|
app.getApplicationId(), app.isPending(), app.getUser());
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
metrics.finishApp(user, RMAppState.FAILED);
|
metrics.finishApp(user, RMAppState.FAILED);
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 1, 0, true);
|
AppMetricsChecker.createFromChecker(appMetricsChecker)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.counter(APPS_FAILED, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
|
||||||
assertNull(userSource);
|
assertNull(userSource);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test public void testSingleQueueWithUserMetrics() {
|
@Test
|
||||||
|
public void testSingleQueueWithUserMetrics() {
|
||||||
String queueName = "single2";
|
String queueName = "single2";
|
||||||
String user = "dodo";
|
String user = "dodo";
|
||||||
|
|
||||||
|
@ -175,12 +237,22 @@ public class TestQueueMetrics {
|
||||||
metrics.submitApp(user);
|
metrics.submitApp(user);
|
||||||
MetricsSource userSource = userSource(ms, queueName, user);
|
MetricsSource userSource = userSource(ms, queueName, user);
|
||||||
|
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
|
||||||
checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
|
||||||
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.checkAgainst(userSource, true);
|
||||||
|
|
||||||
metrics.submitAppAttempt(user);
|
metrics.submitAppAttempt(user);
|
||||||
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
|
appMetricsQueueSourceChecker = AppMetricsChecker
|
||||||
checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
|
.createFromChecker(appMetricsQueueSourceChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
appMetricsUserSourceChecker = AppMetricsChecker
|
||||||
|
.createFromChecker(appMetricsUserSourceChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 1)
|
||||||
|
.checkAgainst(userSource, true);
|
||||||
|
|
||||||
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
|
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
|
||||||
Resources.createResource(100*GB, 100));
|
Resources.createResource(100*GB, 100));
|
||||||
|
@ -188,36 +260,97 @@ public class TestQueueMetrics {
|
||||||
user, Resources.createResource(10*GB, 10));
|
user, Resources.createResource(10*GB, 10));
|
||||||
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
|
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, 5, Resources.createResource(3*GB, 3));
|
user, 5, Resources.createResource(3*GB, 3));
|
||||||
|
|
||||||
// Available resources is set externally, as it depends on dynamic
|
// Available resources is set externally, as it depends on dynamic
|
||||||
// configurable cluster/queue resources
|
// configurable cluster/queue resources
|
||||||
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
ResourceMetricsChecker resMetricsQueueSourceChecker =
|
||||||
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
|
ResourceMetricsChecker.create()
|
||||||
|
.gaugeLong(AVAILABLE_MB, 100 * GB)
|
||||||
|
.gaugeInt(AVAILABLE_V_CORES, 100)
|
||||||
|
.gaugeLong(PENDING_MB, 15 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 15)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 5)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
ResourceMetricsChecker resMetricsUserSourceChecker =
|
||||||
|
ResourceMetricsChecker.create()
|
||||||
|
.gaugeLong(AVAILABLE_MB, 10 * GB)
|
||||||
|
.gaugeInt(AVAILABLE_V_CORES, 10)
|
||||||
|
.gaugeLong(PENDING_MB, 15 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 15)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 5)
|
||||||
|
.checkAgainst(userSource);
|
||||||
|
|
||||||
metrics.runAppAttempt(app.getApplicationId(), user);
|
metrics.runAppAttempt(app.getApplicationId(), user);
|
||||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
appMetricsQueueSourceChecker = AppMetricsChecker
|
||||||
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
|
.createFromChecker(appMetricsQueueSourceChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
appMetricsUserSourceChecker = AppMetricsChecker
|
||||||
|
.createFromChecker(appMetricsUserSourceChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 1)
|
||||||
|
.checkAgainst(userSource, true);
|
||||||
|
|
||||||
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
|
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, 3, Resources.createResource(2*GB, 2), true);
|
user, 3, Resources.createResource(2*GB, 2), true);
|
||||||
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
resMetricsQueueSourceChecker =
|
||||||
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
|
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 6 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 6)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 3)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
|
||||||
|
.gaugeLong(PENDING_MB, 9 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 9)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 2)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
resMetricsUserSourceChecker =
|
||||||
|
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 6 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 6)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 3)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
|
||||||
|
.gaugeLong(PENDING_MB, 9 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 9)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 2)
|
||||||
|
.checkAgainst(userSource);
|
||||||
|
|
||||||
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
|
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, 1, Resources.createResource(2*GB, 2));
|
user, 1, Resources.createResource(2*GB, 2));
|
||||||
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
|
||||||
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
|
.gaugeLong(ALLOCATED_MB, 4 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 4)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 2)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 4 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 4)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 2)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
|
||||||
|
.checkAgainst(userSource);
|
||||||
|
|
||||||
metrics.finishAppAttempt(
|
metrics.finishAppAttempt(
|
||||||
app.getApplicationId(), app.isPending(), app.getUser());
|
app.getApplicationId(), app.isPending(), app.getUser());
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
appMetricsQueueSourceChecker =
|
||||||
checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
|
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
appMetricsUserSourceChecker =
|
||||||
|
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.checkAgainst(userSource, true);
|
||||||
metrics.finishApp(user, RMAppState.FINISHED);
|
metrics.finishApp(user, RMAppState.FINISHED);
|
||||||
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
|
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
|
||||||
checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
|
.counter(APPS_COMPLETED, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
|
||||||
|
.counter(APPS_COMPLETED, 1)
|
||||||
|
.checkAgainst(userSource, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
@Test public void testNodeTypeMetrics() {
|
public void testNodeTypeMetrics() {
|
||||||
String parentQueueName = "root";
|
String parentQueueName = "root";
|
||||||
String leafQueueName = "root.leaf";
|
String leafQueueName = "root.leaf";
|
||||||
String user = "alice";
|
String user = "alice";
|
||||||
|
@ -259,11 +392,10 @@ public class TestQueueMetrics {
|
||||||
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L);
|
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L);
|
||||||
checkAggregatedNodeTypes(userSource, 1L, 1L, 2L);
|
checkAggregatedNodeTypes(userSource, 1L, 1L, 2L);
|
||||||
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 2L);
|
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 2L);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
@Test public void testTwoLevelWithUserMetrics() {
|
public void testTwoLevelWithUserMetrics() {
|
||||||
String parentQueueName = "root";
|
String parentQueueName = "root";
|
||||||
String leafQueueName = "root.leaf";
|
String leafQueueName = "root.leaf";
|
||||||
String user = "alice";
|
String user = "alice";
|
||||||
|
@ -282,16 +414,38 @@ public class TestQueueMetrics {
|
||||||
MetricsSource userSource = userSource(ms, leafQueueName, user);
|
MetricsSource userSource = userSource(ms, leafQueueName, user);
|
||||||
MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
|
MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
|
||||||
|
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
|
||||||
checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true);
|
.counter(APPS_SUBMITTED, 1)
|
||||||
checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
|
.checkAgainst(queueSource, true);
|
||||||
checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true);
|
AppMetricsChecker appMetricsParentQueueSourceChecker =
|
||||||
|
AppMetricsChecker.create()
|
||||||
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.checkAgainst(parentQueueSource, true);
|
||||||
|
AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
|
||||||
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.checkAgainst(userSource, true);
|
||||||
|
AppMetricsChecker appMetricsParentUserSourceChecker =
|
||||||
|
AppMetricsChecker.create()
|
||||||
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.checkAgainst(parentUserSource, true);
|
||||||
|
|
||||||
metrics.submitAppAttempt(user);
|
metrics.submitAppAttempt(user);
|
||||||
checkApps(queueSource, 1, 1, 0, 0, 0, 0, true);
|
appMetricsQueueSourceChecker =
|
||||||
checkApps(parentQueueSource, 1, 1, 0, 0, 0, 0, true);
|
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
|
||||||
checkApps(userSource, 1, 1, 0, 0, 0, 0, true);
|
.gaugeInt(APPS_PENDING, 1)
|
||||||
checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true);
|
.checkAgainst(queueSource, true);
|
||||||
|
appMetricsParentQueueSourceChecker =
|
||||||
|
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 1)
|
||||||
|
.checkAgainst(parentQueueSource, true);
|
||||||
|
appMetricsUserSourceChecker =
|
||||||
|
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 1)
|
||||||
|
.checkAgainst(userSource, true);
|
||||||
|
appMetricsParentUserSourceChecker =
|
||||||
|
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 1)
|
||||||
|
.checkAgainst(parentUserSource, true);
|
||||||
|
|
||||||
parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
|
parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
|
||||||
Resources.createResource(100*GB, 100));
|
Resources.createResource(100*GB, 100));
|
||||||
|
@ -303,14 +457,51 @@ public class TestQueueMetrics {
|
||||||
user, Resources.createResource(10*GB, 10));
|
user, Resources.createResource(10*GB, 10));
|
||||||
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
|
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, 5, Resources.createResource(3*GB, 3));
|
user, 5, Resources.createResource(3*GB, 3));
|
||||||
checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
|
||||||
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0);
|
ResourceMetricsChecker resMetricsQueueSourceChecker =
|
||||||
checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
|
ResourceMetricsChecker.create()
|
||||||
checkResources(parentUserSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0);
|
.gaugeLong(AVAILABLE_MB, 100 * GB)
|
||||||
|
.gaugeInt(AVAILABLE_V_CORES, 100)
|
||||||
|
.gaugeLong(PENDING_MB, 15 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 15)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 5)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
ResourceMetricsChecker resMetricsParentQueueSourceChecker =
|
||||||
|
ResourceMetricsChecker.create()
|
||||||
|
.gaugeLong(AVAILABLE_MB, 100 * GB)
|
||||||
|
.gaugeInt(AVAILABLE_V_CORES, 100)
|
||||||
|
.gaugeLong(PENDING_MB, 15 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 15)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 5)
|
||||||
|
.checkAgainst(parentQueueSource);
|
||||||
|
ResourceMetricsChecker resMetricsUserSourceChecker =
|
||||||
|
ResourceMetricsChecker.create()
|
||||||
|
.gaugeLong(AVAILABLE_MB, 10 * GB)
|
||||||
|
.gaugeInt(AVAILABLE_V_CORES, 10)
|
||||||
|
.gaugeLong(PENDING_MB, 15 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 15)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 5)
|
||||||
|
.checkAgainst(userSource);
|
||||||
|
ResourceMetricsChecker resMetricsParentUserSourceChecker =
|
||||||
|
ResourceMetricsChecker.create()
|
||||||
|
.gaugeLong(AVAILABLE_MB, 10 * GB)
|
||||||
|
.gaugeInt(AVAILABLE_V_CORES, 10)
|
||||||
|
.gaugeLong(PENDING_MB, 15 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 15)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 5)
|
||||||
|
.checkAgainst(parentUserSource);
|
||||||
|
|
||||||
metrics.runAppAttempt(app.getApplicationId(), user);
|
metrics.runAppAttempt(app.getApplicationId(), user);
|
||||||
checkApps(queueSource, 1, 0, 1, 0, 0, 0, true);
|
appMetricsQueueSourceChecker =
|
||||||
checkApps(userSource, 1, 0, 1, 0, 0, 0, true);
|
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 1)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
appMetricsUserSourceChecker =
|
||||||
|
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 1)
|
||||||
|
.checkAgainst(userSource, true);
|
||||||
|
|
||||||
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
|
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, 3, Resources.createResource(2*GB, 2), true);
|
user, 3, Resources.createResource(2*GB, 2), true);
|
||||||
|
@ -318,32 +509,139 @@ public class TestQueueMetrics {
|
||||||
user, Resources.createResource(3*GB, 3));
|
user, Resources.createResource(3*GB, 3));
|
||||||
// Available resources is set externally, as it depends on dynamic
|
// Available resources is set externally, as it depends on dynamic
|
||||||
// configurable cluster/queue resources
|
// configurable cluster/queue resources
|
||||||
checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
|
resMetricsQueueSourceChecker =
|
||||||
checkResources(parentQueueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1);
|
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
|
||||||
checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
|
.gaugeLong(ALLOCATED_MB, 6 * GB)
|
||||||
checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1);
|
.gaugeInt(ALLOCATED_V_CORES, 6)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 3)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
|
||||||
|
.gaugeLong(PENDING_MB, 9 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 9)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 2)
|
||||||
|
.gaugeLong(RESERVED_MB, 3 * GB)
|
||||||
|
.gaugeInt(RESERVED_V_CORES, 3)
|
||||||
|
.gaugeInt(RESERVED_CONTAINERS, 1)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
resMetricsParentQueueSourceChecker =
|
||||||
|
ResourceMetricsChecker
|
||||||
|
.createFromChecker(resMetricsParentQueueSourceChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 6 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 6)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 3)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
|
||||||
|
.gaugeLong(PENDING_MB, 9 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 9)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 2)
|
||||||
|
.gaugeLong(RESERVED_MB, 3 * GB)
|
||||||
|
.gaugeInt(RESERVED_V_CORES, 3)
|
||||||
|
.gaugeInt(RESERVED_CONTAINERS, 1)
|
||||||
|
.checkAgainst(parentQueueSource);
|
||||||
|
resMetricsUserSourceChecker =
|
||||||
|
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 6 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 6)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 3)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
|
||||||
|
.gaugeLong(PENDING_MB, 9 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 9)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 2)
|
||||||
|
.gaugeLong(RESERVED_MB, 3 * GB)
|
||||||
|
.gaugeInt(RESERVED_V_CORES, 3)
|
||||||
|
.gaugeInt(RESERVED_CONTAINERS, 1)
|
||||||
|
.checkAgainst(userSource);
|
||||||
|
resMetricsParentUserSourceChecker = ResourceMetricsChecker
|
||||||
|
.createFromChecker(resMetricsParentUserSourceChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 6 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 6)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 3)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_ALLOCATED, 3)
|
||||||
|
.gaugeLong(PENDING_MB, 9 * GB)
|
||||||
|
.gaugeInt(PENDING_V_CORES, 9)
|
||||||
|
.gaugeInt(PENDING_CONTAINERS, 2)
|
||||||
|
.gaugeLong(RESERVED_MB, 3 * GB)
|
||||||
|
.gaugeInt(RESERVED_V_CORES, 3)
|
||||||
|
.gaugeInt(RESERVED_CONTAINERS, 1)
|
||||||
|
.checkAgainst(parentUserSource);
|
||||||
|
|
||||||
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
|
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, 1, Resources.createResource(2*GB, 2));
|
user, 1, Resources.createResource(2*GB, 2));
|
||||||
metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
|
metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
|
||||||
user, Resources.createResource(3*GB, 3));
|
user, Resources.createResource(3*GB, 3));
|
||||||
checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
|
||||||
checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0);
|
.gaugeLong(ALLOCATED_MB, 4 * GB)
|
||||||
checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
|
.gaugeInt(ALLOCATED_V_CORES, 4)
|
||||||
checkResources(parentUserSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0);
|
.gaugeInt(ALLOCATED_CONTAINERS, 2)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
|
||||||
|
.gaugeLong(RESERVED_MB, 0)
|
||||||
|
.gaugeInt(RESERVED_V_CORES, 0)
|
||||||
|
.gaugeInt(RESERVED_CONTAINERS, 0)
|
||||||
|
.checkAgainst(queueSource);
|
||||||
|
ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 4 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 4)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 2)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
|
||||||
|
.gaugeLong(RESERVED_MB, 0)
|
||||||
|
.gaugeInt(RESERVED_V_CORES, 0)
|
||||||
|
.gaugeInt(RESERVED_CONTAINERS, 0)
|
||||||
|
.checkAgainst(parentQueueSource);
|
||||||
|
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 4 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 4)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 2)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
|
||||||
|
.gaugeLong(RESERVED_MB, 0)
|
||||||
|
.gaugeInt(RESERVED_V_CORES, 0)
|
||||||
|
.gaugeInt(RESERVED_CONTAINERS, 0)
|
||||||
|
.checkAgainst(userSource);
|
||||||
|
ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker)
|
||||||
|
.gaugeLong(ALLOCATED_MB, 4 * GB)
|
||||||
|
.gaugeInt(ALLOCATED_V_CORES, 4)
|
||||||
|
.gaugeInt(ALLOCATED_CONTAINERS, 2)
|
||||||
|
.counter(AGGREGATE_CONTAINERS_RELEASED, 1)
|
||||||
|
.gaugeLong(RESERVED_MB, 0)
|
||||||
|
.gaugeInt(RESERVED_V_CORES, 0)
|
||||||
|
.gaugeInt(RESERVED_CONTAINERS, 0)
|
||||||
|
.checkAgainst(parentUserSource);
|
||||||
|
|
||||||
metrics.finishAppAttempt(
|
metrics.finishAppAttempt(
|
||||||
app.getApplicationId(), app.isPending(), app.getUser());
|
app.getApplicationId(), app.isPending(), app.getUser());
|
||||||
checkApps(queueSource, 1, 0, 0, 0, 0, 0, true);
|
appMetricsQueueSourceChecker = AppMetricsChecker
|
||||||
checkApps(parentQueueSource, 1, 0, 0, 0, 0, 0, true);
|
.createFromChecker(appMetricsQueueSourceChecker)
|
||||||
checkApps(userSource, 1, 0, 0, 0, 0, 0, true);
|
.counter(APPS_SUBMITTED, 1)
|
||||||
checkApps(parentUserSource, 1, 0, 0, 0, 0, 0, true);
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
|
appMetricsParentQueueSourceChecker = AppMetricsChecker
|
||||||
|
.createFromChecker(appMetricsParentQueueSourceChecker)
|
||||||
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.checkAgainst(parentQueueSource, true);
|
||||||
|
appMetricsUserSourceChecker = AppMetricsChecker
|
||||||
|
.createFromChecker(appMetricsUserSourceChecker)
|
||||||
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.checkAgainst(userSource, true);
|
||||||
|
appMetricsParentUserSourceChecker = AppMetricsChecker
|
||||||
|
.createFromChecker(appMetricsParentUserSourceChecker)
|
||||||
|
.counter(APPS_SUBMITTED, 1)
|
||||||
|
.gaugeInt(APPS_PENDING, 0)
|
||||||
|
.gaugeInt(APPS_RUNNING, 0)
|
||||||
|
.checkAgainst(parentUserSource, true);
|
||||||
|
|
||||||
metrics.finishApp(user, RMAppState.FINISHED);
|
metrics.finishApp(user, RMAppState.FINISHED);
|
||||||
checkApps(queueSource, 1, 0, 0, 1, 0, 0, true);
|
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
|
||||||
checkApps(parentQueueSource, 1, 0, 0, 1, 0, 0, true);
|
.counter(APPS_COMPLETED, 1)
|
||||||
checkApps(userSource, 1, 0, 0, 1, 0, 0, true);
|
.checkAgainst(queueSource, true);
|
||||||
checkApps(parentUserSource, 1, 0, 0, 1, 0, 0, true);
|
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
|
||||||
|
.counter(APPS_COMPLETED, 1)
|
||||||
|
.checkAgainst(parentQueueSource, true);
|
||||||
|
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
|
||||||
|
.counter(APPS_COMPLETED, 1)
|
||||||
|
.checkAgainst(userSource, true);
|
||||||
|
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
|
||||||
|
.counter(APPS_COMPLETED, 1)
|
||||||
|
.checkAgainst(parentUserSource, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -383,8 +681,9 @@ public class TestQueueMetrics {
|
||||||
FifoScheduler.class, ResourceScheduler.class);
|
FifoScheduler.class, ResourceScheduler.class);
|
||||||
MockRM rm = new MockRM(conf);
|
MockRM rm = new MockRM(conf);
|
||||||
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
|
||||||
checkApps(metrics, 0, 0, 0, 0, 0, 0, true);
|
AppMetricsChecker.create()
|
||||||
MetricsAsserts.assertGauge("ReservedContainers", 0, metrics);
|
.checkAgainst(metrics, true);
|
||||||
|
MetricsAsserts.assertGauge(RESERVED_CONTAINERS.getValue(), 0, metrics);
|
||||||
}
|
}
|
||||||
|
|
||||||
// This is to test all metrics can consistently show up if specified true to
|
// This is to test all metrics can consistently show up if specified true to
|
||||||
|
@ -396,52 +695,23 @@ public class TestQueueMetrics {
|
||||||
QueueMetrics.forQueue(ms, queueName, null, false, conf);
|
QueueMetrics.forQueue(ms, queueName, null, false, conf);
|
||||||
MetricsSource queueSource = queueSource(ms, queueName);
|
MetricsSource queueSource = queueSource(ms, queueName);
|
||||||
|
|
||||||
checkApps(queueSource, 0, 0, 0, 0, 0, 0, true);
|
AppMetricsChecker.create()
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
try {
|
try {
|
||||||
// do not collect all metrics
|
// do not collect all metrics
|
||||||
checkApps(queueSource, 0, 0, 0, 0, 0, 0, false);
|
AppMetricsChecker.create()
|
||||||
|
.checkAgainst(queueSource, false);
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
} catch (AssertionError e) {
|
} catch (AssertionError e) {
|
||||||
Assert.assertTrue(e.getMessage().contains(
|
Assert.assertTrue(
|
||||||
"Expected exactly one metric for name "));
|
e.getMessage().contains("Expected exactly one metric for name "));
|
||||||
}
|
}
|
||||||
// collect all metrics
|
// collect all metrics
|
||||||
checkApps(queueSource, 0, 0, 0, 0, 0, 0, true);
|
AppMetricsChecker.create()
|
||||||
|
.checkAgainst(queueSource, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void checkApps(MetricsSource source, int submitted, int pending,
|
private static void checkAggregatedNodeTypes(MetricsSource source,
|
||||||
int running, int completed, int failed, int killed, boolean all) {
|
|
||||||
MetricsRecordBuilder rb = getMetrics(source, all);
|
|
||||||
assertCounter("AppsSubmitted", submitted, rb);
|
|
||||||
assertGauge("AppsPending", pending, rb);
|
|
||||||
assertGauge("AppsRunning", running, rb);
|
|
||||||
assertCounter("AppsCompleted", completed, rb);
|
|
||||||
assertCounter("AppsFailed", failed, rb);
|
|
||||||
assertCounter("AppsKilled", killed, rb);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void checkResources(MetricsSource source, long allocatedMB,
|
|
||||||
int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
|
|
||||||
long aggreReleasedCtnrs, long availableMB, int availableCores, long pendingMB,
|
|
||||||
int pendingCores, int pendingCtnrs, long reservedMB, int reservedCores,
|
|
||||||
int reservedCtnrs) {
|
|
||||||
MetricsRecordBuilder rb = getMetrics(source);
|
|
||||||
assertGauge("AllocatedMB", allocatedMB, rb);
|
|
||||||
assertGauge("AllocatedVCores", allocatedCores, rb);
|
|
||||||
assertGauge("AllocatedContainers", allocCtnrs, rb);
|
|
||||||
assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
|
|
||||||
assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
|
|
||||||
assertGauge("AvailableMB", availableMB, rb);
|
|
||||||
assertGauge("AvailableVCores", availableCores, rb);
|
|
||||||
assertGauge("PendingMB", pendingMB, rb);
|
|
||||||
assertGauge("PendingVCores", pendingCores, rb);
|
|
||||||
assertGauge("PendingContainers", pendingCtnrs, rb);
|
|
||||||
assertGauge("ReservedMB", reservedMB, rb);
|
|
||||||
assertGauge("ReservedVCores", reservedCores, rb);
|
|
||||||
assertGauge("ReservedContainers", reservedCtnrs, rb);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static void checkAggregatedNodeTypes(MetricsSource source,
|
|
||||||
long nodeLocal, long rackLocal, long offSwitch) {
|
long nodeLocal, long rackLocal, long offSwitch) {
|
||||||
MetricsRecordBuilder rb = getMetrics(source);
|
MetricsRecordBuilder rb = getMetrics(source);
|
||||||
assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb);
|
assertCounter("AggregateNodeLocalContainersAllocated", nodeLocal, rb);
|
||||||
|
@ -459,14 +729,12 @@ public class TestQueueMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MetricsSource queueSource(MetricsSystem ms, String queue) {
|
public static MetricsSource queueSource(MetricsSystem ms, String queue) {
|
||||||
MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).toString());
|
return ms.getSource(QueueMetrics.sourceName(queue).toString());
|
||||||
return s;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static MetricsSource userSource(MetricsSystem ms, String queue,
|
private static MetricsSource userSource(MetricsSystem ms, String queue,
|
||||||
String user) {
|
String user) {
|
||||||
MetricsSource s = ms.getSource(QueueMetrics.sourceName(queue).
|
return ms.getSource(QueueMetrics.sourceName(queue).
|
||||||
append(",user=").append(user).toString());
|
append(",user=").append(user).toString());
|
||||||
return s;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue