YARN-8842. Expose metrics for custom resource types in QueueMetrics. (Contributed by Szilard Nemeth) (refactored patch for branch-2)

This commit is contained in:
Eric E Payne 2019-11-21 19:21:52 +00:00
parent 835bc686e0
commit a9f51e0372
9 changed files with 1490 additions and 168 deletions

View File

@ -16,6 +16,8 @@
package org.apache.hadoop.yarn.resourcetypes; package org.apache.hadoop.yarn.resourcetypes;
import com.google.common.collect.Maps;
import java.util.HashMap;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -90,4 +92,26 @@ public final class ResourceTypesTestHelper {
return new ResourceValueAndUnit(value, matcher.group(2)); return new ResourceValueAndUnit(value, matcher.group(2));
} }
public static Map<String, Long> extractCustomResources(Resource res) {
Map<String, Long> customResources = Maps.newHashMap();
for (int i = 0; i < res.getResources().length; i++) {
ResourceInformation ri = res.getResourceInformation(i);
if (!ri.getName().equals(ResourceInformation.MEMORY_URI)
&& !ri.getName().equals(ResourceInformation.VCORES_URI)) {
customResources.put(ri.getName(), ri.getValue());
}
}
return customResources;
}
public static Map<String, String> extractCustomResourcesAsStrings(
Resource res) {
Map<String, Long> resValues = extractCustomResources(res);
Map<String, String> resValuesAsStrings = new HashMap<>();
for (Map.Entry<String, Long> entry : resValues.entrySet()) {
resValuesAsStrings.put(entry.getKey(), String.valueOf(entry.getValue()));
}
return resValuesAsStrings;
}
} }

View File

@ -46,7 +46,9 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.QueueMetricsForCustomResources.QueueMetricsCustomResource;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -119,6 +121,7 @@ public class QueueMetrics implements MetricsSource {
protected final MetricsSystem metricsSystem; protected final MetricsSystem metricsSystem;
protected final Map<String, QueueMetrics> users; protected final Map<String, QueueMetrics> users;
protected final Configuration conf; protected final Configuration conf;
private QueueMetricsForCustomResources queueMetricsForCustomResources;
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) { boolean enableUserMetrics, Configuration conf) {
@ -130,6 +133,11 @@ public class QueueMetrics implements MetricsSource {
metricsSystem = ms; metricsSystem = ms;
this.conf = conf; this.conf = conf;
runningTime = buildBuckets(conf); runningTime = buildBuckets(conf);
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
this.queueMetricsForCustomResources =
new QueueMetricsForCustomResources();
}
} }
protected QueueMetrics tag(MetricsInfo info, String value) { protected QueueMetrics tag(MetricsInfo info, String value) {
@ -355,9 +363,12 @@ public class QueueMetrics implements MetricsSource {
* @param limit resource limit * @param limit resource limit
*/ */
public void setAvailableResourcesToQueue(String partition, Resource limit) { public void setAvailableResourcesToQueue(String partition, Resource limit) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
availableMB.set(limit.getMemorySize()); availableMB.set(limit.getMemorySize());
availableVCores.set(limit.getVirtualCores()); availableVCores.set(limit.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.setAvailable(limit);
}
} }
} }
@ -397,7 +408,7 @@ public class QueueMetrics implements MetricsSource {
*/ */
public void incrPendingResources(String partition, String user, public void incrPendingResources(String partition, String user,
int containers, Resource res) { int containers, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
_incrPendingResources(containers, res); _incrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
@ -413,12 +424,15 @@ public class QueueMetrics implements MetricsSource {
pendingContainers.incr(containers); pendingContainers.incr(containers);
pendingMB.incr(res.getMemorySize() * containers); pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCores() * containers); pendingVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increasePending(res, containers);
}
} }
public void decrPendingResources(String partition, String user, public void decrPendingResources(String partition, String user,
int containers, Resource res) { int containers, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
_decrPendingResources(containers, res); _decrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
@ -434,6 +448,9 @@ public class QueueMetrics implements MetricsSource {
pendingContainers.decr(containers); pendingContainers.decr(containers);
pendingMB.decr(res.getMemorySize() * containers); pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCores() * containers); pendingVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res, containers);
}
} }
public void incrNodeTypeAggregations(String user, NodeType type) { public void incrNodeTypeAggregations(String user, NodeType type) {
@ -457,12 +474,16 @@ public class QueueMetrics implements MetricsSource {
public void allocateResources(String partition, String user, public void allocateResources(String partition, String user,
int containers, Resource res, boolean decrPending) { int containers, Resource res, boolean decrPending) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedContainers.incr(containers); allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers); aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemorySize() * containers); allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers); allocatedVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res, containers);
}
if (decrPending) { if (decrPending) {
_decrPendingResources(containers, res); _decrPendingResources(containers, res);
} }
@ -484,12 +505,18 @@ public class QueueMetrics implements MetricsSource {
* @param res * @param res
*/ */
public void allocateResources(String partition, String user, Resource res) { public void allocateResources(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedMB.incr(res.getMemorySize()); allocatedMB.incr(res.getMemorySize());
allocatedVCores.incr(res.getVirtualCores()); allocatedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res);
}
pendingMB.decr(res.getMemorySize()); pendingMB.decr(res.getMemorySize());
pendingVCores.decr(res.getVirtualCores()); pendingVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res);
}
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
@ -503,11 +530,15 @@ public class QueueMetrics implements MetricsSource {
public void releaseResources(String partition, public void releaseResources(String partition,
String user, int containers, Resource res) { String user, int containers, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedContainers.decr(containers); allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers); aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemorySize() * containers); allocatedMB.decr(res.getMemorySize() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers); allocatedVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res, containers);
}
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.releaseResources(partition, user, containers, res); userMetrics.releaseResources(partition, user, containers, res);
@ -524,9 +555,13 @@ public class QueueMetrics implements MetricsSource {
* @param user * @param user
* @param res * @param res
*/ */
public void releaseResources(String user, Resource res) { private void releaseResources(String user, Resource res) {
allocatedMB.decr(res.getMemorySize()); allocatedMB.decr(res.getMemorySize());
allocatedVCores.decr(res.getVirtualCores()); allocatedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res);
}
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.releaseResources(user, res); userMetrics.releaseResources(user, res);
@ -557,6 +592,17 @@ public class QueueMetrics implements MetricsSource {
} }
} }
public void updatePreemptedSecondsForCustomResources(Resource res,
long seconds) {
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources
.increaseAggregatedPreemptedSeconds(res, seconds);
}
if (parent != null) {
parent.updatePreemptedSecondsForCustomResources(res, seconds);
}
}
public void updatePreemptedResources(Resource res) { public void updatePreemptedResources(Resource res) {
aggregateMemoryMBPreempted.incr(res.getMemorySize()); aggregateMemoryMBPreempted.incr(res.getMemorySize());
aggregateVcoresPreempted.incr(res.getVirtualCores()); aggregateVcoresPreempted.incr(res.getVirtualCores());
@ -566,7 +612,7 @@ public class QueueMetrics implements MetricsSource {
} }
public void reserveResource(String partition, String user, Resource res) { public void reserveResource(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
reserveResource(user, res); reserveResource(user, res);
} }
} }
@ -575,6 +621,9 @@ public class QueueMetrics implements MetricsSource {
reservedContainers.incr(); reservedContainers.incr();
reservedMB.incr(res.getMemorySize()); reservedMB.incr(res.getMemorySize());
reservedVCores.incr(res.getVirtualCores()); reservedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseReserved(res);
}
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.reserveResource(user, res); userMetrics.reserveResource(user, res);
@ -584,10 +633,13 @@ public class QueueMetrics implements MetricsSource {
} }
} }
public void unreserveResource(String user, Resource res) { private void unreserveResource(String user, Resource res) {
reservedContainers.decr(); reservedContainers.decr();
reservedMB.decr(res.getMemorySize()); reservedMB.decr(res.getMemorySize());
reservedVCores.decr(res.getVirtualCores()); reservedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseReserved(res);
}
QueueMetrics userMetrics = getUserMetrics(user); QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) { if (userMetrics != null) {
userMetrics.unreserveResource(user, res); userMetrics.unreserveResource(user, res);
@ -598,7 +650,7 @@ public class QueueMetrics implements MetricsSource {
} }
public void unreserveResource(String partition, String user, Resource res) { public void unreserveResource(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
unreserveResource(user, res); unreserveResource(user, res);
} }
} }
@ -662,8 +714,57 @@ public class QueueMetrics implements MetricsSource {
} }
public Resource getAllocatedResources() { public Resource getAllocatedResources() {
return BuilderUtils.newResource(allocatedMB.value(), if (queueMetricsForCustomResources != null) {
(int) allocatedVCores.value()); return Resource.newInstance(allocatedMB.value(), allocatedVCores.value(),
queueMetricsForCustomResources.getAllocatedValues());
}
return Resource.newInstance(allocatedMB.value(),
allocatedVCores.value());
}
public Resource getAvailableResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(availableMB.value(), availableVCores.value(),
queueMetricsForCustomResources.getAvailableValues());
}
return Resource.newInstance(availableMB.value(), availableVCores.value());
}
public Resource getPendingResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(pendingMB.value(), pendingVCores.value(),
queueMetricsForCustomResources.getPendingValues());
}
return Resource.newInstance(pendingMB.value(), pendingVCores.value());
}
public Resource getReservedResources() {
if (queueMetricsForCustomResources != null) {
return Resource.newInstance(reservedMB.value(), reservedVCores.value(),
queueMetricsForCustomResources.getReservedValues());
}
return Resource.newInstance(reservedMB.value(), reservedVCores.value());
}
/**
* Handle this specially as this has a long value and it could be
* truncated when casted into an int parameter of
* Resource.newInstance (vCores).
* @return QueueMetricsCustomResource
*/
@VisibleForTesting
public QueueMetricsCustomResource getAggregatedPreemptedSecondsResources() {
return queueMetricsForCustomResources.getAggregatePreemptedSeconds();
}
@VisibleForTesting
public MutableCounterLong getAggregateMemoryMBSecondsPreempted() {
return aggregateMemoryMBSecondsPreempted;
}
@VisibleForTesting
public MutableCounterLong getAggregateVcoreSecondsPreempted() {
return aggregateVcoreSecondsPreempted;
} }
public long getAllocatedMB() { public long getAllocatedMB() {

View File

@ -0,0 +1,195 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import com.google.common.collect.Maps;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.util.Map;
/**
* This class is a main entry-point for any kind of metrics for
* custom resources.
* It provides increase and decrease methods for all types of metrics.
*/
public class QueueMetricsForCustomResources {
/**
* Class that holds metrics values for custom resources in a map keyed with
* the name of the custom resource.
* There are different kinds of values like allocated, available and others.
*/
public static class QueueMetricsCustomResource {
private final Map<String, Long> values = Maps.newHashMap();
protected void increase(Resource res) {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
ResourceInformation[] resources = res.getResources();
for (int i = 2; i < resources.length; i++) {
ResourceInformation resource = resources[i];
if (!values.containsKey(resource.getName())) {
values.put(resource.getName(), 0L);
}
values.put(resource.getName(),
values.get(resource.getName()) + resource.getValue());
}
}
}
void increaseWithMultiplier(Resource res, long multiplier) {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
ResourceInformation[] resources = res.getResources();
for (int i = 2; i < resources.length; i++) {
ResourceInformation resource = resources[i];
if (!values.containsKey(resource.getName())) {
values.put(resource.getName(), 0L);
}
values.put(resource.getName(), values.get(resource.getName())
+ resource.getValue() * multiplier);
}
}
}
protected void decrease(Resource res) {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
ResourceInformation[] resources = res.getResources();
for (int i = 2; i < resources.length; i++) {
ResourceInformation resource = resources[i];
if (!values.containsKey(resource.getName())) {
values.put(resource.getName(), 0L);
}
values.put(resource.getName(),
values.get(resource.getName()) - resource.getValue());
}
}
}
void decreaseWithMultiplier(Resource res, int containers) {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
ResourceInformation[] resources = res.getResources();
for (int i = 2; i < resources.length; i++) {
ResourceInformation resource = resources[i];
if (!values.containsKey(resource.getName())) {
values.put(resource.getName(), 0L);
}
values.put(resource.getName(),
values.get(resource.getName()) - resource.getValue() * containers);
}
}
}
protected void set(Resource res) {
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
ResourceInformation[] resources = res.getResources();
for (int i = 2; i < resources.length; i++) {
ResourceInformation resource = resources[i];
values.put(resource.getName(), resource.getValue());
}
}
}
public Map<String, Long> getValues() {
return values;
}
}
private final QueueMetricsCustomResource aggregatePreemptedSeconds =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource allocated =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource available =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource pending =
new QueueMetricsCustomResource();
private final QueueMetricsCustomResource reserved =
new QueueMetricsCustomResource();
public void increaseReserved(Resource res) {
reserved.increase(res);
}
public void decreaseReserved(Resource res) {
reserved.decrease(res);
}
public void setAvailable(Resource res) {
available.set(res);
}
public void increasePending(Resource res, int containers) {
pending.increaseWithMultiplier(res, containers);
}
public void decreasePending(Resource res) {
pending.decrease(res);
}
public void decreasePending(Resource res, int containers) {
pending.decreaseWithMultiplier(res, containers);
}
public void increaseAllocated(Resource res) {
allocated.increase(res);
}
public void increaseAllocated(Resource res, int containers) {
allocated.increaseWithMultiplier(res, containers);
}
public void decreaseAllocated(Resource res) {
allocated.decrease(res);
}
public void decreaseAllocated(Resource res, int containers) {
allocated.decreaseWithMultiplier(res, containers);
}
public void increaseAggregatedPreemptedSeconds(Resource res, long seconds) {
aggregatePreemptedSeconds.increaseWithMultiplier(res, seconds);
}
Map<String, Long> getAllocatedValues() {
return allocated.getValues();
}
Map<String, Long> getAvailableValues() {
return available.getValues();
}
Map<String, Long> getPendingValues() {
return pending.getValues();
}
Map<String, Long> getReservedValues() {
return reserved.getValues();
}
QueueMetricsCustomResource getAggregatePreemptedSeconds() {
return aggregatePreemptedSeconds;
}
}

View File

@ -1727,7 +1727,8 @@ public class CapacityScheduler extends
private void updateQueuePreemptionMetrics( private void updateQueuePreemptionMetrics(
CSQueue queue, RMContainer rmc) { CSQueue queue, RMContainer rmc) {
QueueMetrics qMetrics = queue.getMetrics(); QueueMetrics qMetrics = queue.getMetrics();
long usedMillis = rmc.getFinishTime() - rmc.getCreationTime(); final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND;
Resource containerResource = rmc.getAllocatedResource(); Resource containerResource = rmc.getAllocatedResource();
qMetrics.preemptContainer(); qMetrics.preemptContainer();
long mbSeconds = (containerResource.getMemorySize() * usedMillis) long mbSeconds = (containerResource.getMemorySize() * usedMillis)
@ -1736,6 +1737,8 @@ public class CapacityScheduler extends
/ DateUtils.MILLIS_PER_SECOND; / DateUtils.MILLIS_PER_SECOND;
qMetrics.updatePreemptedMemoryMBSeconds(mbSeconds); qMetrics.updatePreemptedMemoryMBSeconds(mbSeconds);
qMetrics.updatePreemptedVcoreSeconds(vcSeconds); qMetrics.updatePreemptedVcoreSeconds(vcSeconds);
qMetrics.updatePreemptedSecondsForCustomResources(containerResource,
usedSeconds);
qMetrics.updatePreemptedResources(containerResource); qMetrics.updatePreemptedResources(containerResource);
} }

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.TestQueueMetrics.userSource;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
/**
* This class holds queue and user metrics for a particular queue,
* used for testing metrics.
* Reference for the parent queue is also stored for every queue,
* except if the queue is root.
*/
public final class QueueInfo {
private final QueueInfo parentQueueInfo;
private final Queue queue;
final QueueMetrics queueMetrics;
final MetricsSource queueSource;
final MetricsSource userSource;
public QueueInfo(QueueInfo parent, String queueName, MetricsSystem ms,
Configuration conf, String user) {
Queue parentQueue = parent == null ? null : parent.queue;
parentQueueInfo = parent;
queueMetrics =
QueueMetrics.forQueue(ms, queueName, parentQueue, true, conf);
queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(queueMetrics);
queueSource = ms.getSource(QueueMetrics.sourceName(queueName).toString());
// need to call getUserMetrics so that a non-null userSource is returned
// with the call to userSource(..)
queueMetrics.getUserMetrics(user);
userSource = userSource(ms, queueName, user);
}
public QueueInfo getRoot() {
QueueInfo root = this;
while (root.parentQueueInfo != null) {
root = root.parentQueueInfo;
}
return root;
}
public void checkAllQueueSources(ResourceMetricsChecker checker) {
checkAllQueueSourcesRecursive(this, checker);
}
private void checkAllQueueSourcesRecursive(QueueInfo queueInfo,
ResourceMetricsChecker checker) {
ResourceMetricsChecker.createFromChecker(checker).checkAgainst(
queueInfo.queueSource);
if (queueInfo.parentQueueInfo != null) {
checkAllQueueSourcesRecursive(queueInfo.parentQueueInfo, checker);
}
}
public QueueInfo getParentQueueInfo() {
return this.parentQueueInfo;
}
}

View File

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.Resource;
import java.util.Map;
import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper
.extractCustomResources;
/**
* This class is to test standard and custom resource metrics for all types.
* Metrics types can be one of: allocated, pending, reserved
* and other resources.
*/
public final class QueueMetricsTestData {
public static final class Builder {
private int containers;
private Resource resource;
private Resource resourceToDecrease;
private Map<String, Long> customResourceValues;
private int containersToDecrease;
private String user;
private String partition;
private QueueInfo queueInfo;
private Builder() {
}
public static Builder create() {
return new Builder();
}
public Builder withContainers(int containers) {
this.containers = containers;
return this;
}
public Builder withResourceToDecrease(Resource res, int containers) {
this.resourceToDecrease = res;
this.containersToDecrease = containers;
return this;
}
public Builder withResources(Resource res) {
this.resource = res;
return this;
}
public Builder withUser(String user) {
this.user = user;
return this;
}
public Builder withPartition(String partition) {
this.partition = partition;
return this;
}
public Builder withLeafQueue(QueueInfo qInfo) {
this.queueInfo = qInfo;
return this;
}
public QueueMetricsTestData build() {
this.customResourceValues = extractCustomResources(resource);
return new QueueMetricsTestData(this);
}
}
final Map<String, Long> customResourceValues;
final int containers;
final Resource resourceToDecrease;
final int containersToDecrease;
final Resource resource;
final String partition;
final QueueInfo leafQueue;
final String user;
private QueueMetricsTestData(Builder builder) {
this.customResourceValues = builder.customResourceValues;
this.containers = builder.containers;
this.resourceToDecrease = builder.resourceToDecrease;
this.containersToDecrease = builder.containersToDecrease;
this.resource = builder.resource;
this.partition = builder.partition;
this.leafQueue = builder.queueInfo;
this.user = builder.user;
}
}

View File

@ -27,34 +27,31 @@ import java.util.Map;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.COUNTER_LONG;
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_INT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_LONG;
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
final class ResourceMetricsChecker { final class ResourceMetricsChecker {
private final static Logger LOG = private final static Logger LOG =
LoggerFactory.getLogger(ResourceMetricsChecker.class); LoggerFactory.getLogger(ResourceMetricsChecker.class);
enum ResourceMetricType {
GAUGE_INT, GAUGE_LONG, COUNTER_INT, COUNTER_LONG
}
private static final ResourceMetricsChecker INITIAL_CHECKER = private static final ResourceMetricsChecker INITIAL_CHECKER =
new ResourceMetricsChecker() new ResourceMetricsChecker()
.gaugeLong(ALLOCATED_MB, 0) .gaugeLong(ALLOCATED_MB, 0)
@ -72,29 +69,41 @@ final class ResourceMetricsChecker {
.gaugeInt(RESERVED_CONTAINERS, 0); .gaugeInt(RESERVED_CONTAINERS, 0);
enum ResourceMetricsKey { enum ResourceMetricsKey {
ALLOCATED_MB("AllocatedMB"), ALLOCATED_MB("AllocatedMB", GAUGE_LONG),
ALLOCATED_V_CORES("AllocatedVCores"), ALLOCATED_V_CORES("AllocatedVCores", GAUGE_INT),
ALLOCATED_CONTAINERS("AllocatedContainers"), ALLOCATED_CONTAINERS("AllocatedContainers", GAUGE_INT),
AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated"), AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated",
AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased"), COUNTER_LONG),
AVAILABLE_MB("AvailableMB"), AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased",
AVAILABLE_V_CORES("AvailableVCores"), COUNTER_LONG),
PENDING_MB("PendingMB"), AVAILABLE_MB("AvailableMB", GAUGE_LONG),
PENDING_V_CORES("PendingVCores"), AVAILABLE_V_CORES("AvailableVCores", GAUGE_INT),
PENDING_CONTAINERS("PendingContainers"), PENDING_MB("PendingMB", GAUGE_LONG),
RESERVED_MB("ReservedMB"), PENDING_V_CORES("PendingVCores", GAUGE_INT),
RESERVED_V_CORES("ReservedVCores"), PENDING_CONTAINERS("PendingContainers", GAUGE_INT),
RESERVED_CONTAINERS("ReservedContainers"); RESERVED_MB("ReservedMB", GAUGE_LONG),
RESERVED_V_CORES("ReservedVCores", GAUGE_INT),
RESERVED_CONTAINERS("ReservedContainers", GAUGE_INT),
AGGREGATE_VCORE_SECONDS_PREEMPTED(
"AggregateVcoreSecondsPreempted", COUNTER_LONG),
AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED(
"AggregateMemoryMBSecondsPreempted", COUNTER_LONG);
private String value; private String value;
private ResourceMetricType type;
ResourceMetricsKey(String value) { ResourceMetricsKey(String value, ResourceMetricType type) {
this.value = value; this.value = value;
this.type = type;
} }
public String getValue() { public String getValue() {
return value; return value;
} }
public ResourceMetricType getType() {
return type;
}
} }
private final Map<ResourceMetricsKey, Long> gaugesLong; private final Map<ResourceMetricsKey, Long> gaugesLong;
@ -123,20 +132,31 @@ final class ResourceMetricsChecker {
} }
ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) { ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) {
ensureTypeIsCorrect(key, GAUGE_LONG);
gaugesLong.put(key, value); gaugesLong.put(key, value);
return this; return this;
} }
ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) { ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) {
ensureTypeIsCorrect(key, GAUGE_INT);
gaugesInt.put(key, value); gaugesInt.put(key, value);
return this; return this;
} }
ResourceMetricsChecker counter(ResourceMetricsKey key, long value) { ResourceMetricsChecker counter(ResourceMetricsKey key, long value) {
ensureTypeIsCorrect(key, COUNTER_LONG);
counters.put(key, value); counters.put(key, value);
return this; return this;
} }
private void ensureTypeIsCorrect(ResourceMetricsKey
key, ResourceMetricType actualType) {
if (key.type != actualType) {
throw new IllegalStateException("Metrics type should be " + key.type
+ " instead of " + actualType + " for metrics: " + key.value);
}
}
ResourceMetricsChecker checkAgainst(MetricsSource source) { ResourceMetricsChecker checkAgainst(MetricsSource source) {
if (source == null) { if (source == null) {
throw new IllegalStateException("MetricsSource should not be null!"); throw new IllegalStateException("MetricsSource should not be null!");

View File

@ -18,15 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler; package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.AppMetricsChecker.AppMetricsKey.*;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.*;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSource;
@ -46,8 +37,40 @@ import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestQueueMetrics { public class TestQueueMetrics {
private static Queue createMockQueue(QueueMetrics metrics) {
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);
return queue;
}
private static final int GB = 1024; // MB private static final int GB = 1024; // MB
private static final String USER = "alice";
private static final String USER_2 = "dodo";
private static final Configuration conf = new Configuration(); private static final Configuration conf = new Configuration();
private MetricsSystem ms; private MetricsSystem ms;
@ -60,19 +83,18 @@ public class TestQueueMetrics {
@Test @Test
public void testDefaultSingleQueueMetrics() { public void testDefaultSingleQueueMetrics() {
String queueName = "single"; String queueName = "single";
String user = "alice";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false, QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
conf); conf);
MetricsSource queueSource= queueSource(ms, queueName); MetricsSource queueSource= queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user); AppSchedulingInfo app = mockApp(USER);
metrics.submitApp(user); metrics.submitApp(USER);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create() AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
metrics.submitAppAttempt(user); metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1) .gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
@ -80,7 +102,7 @@ public class TestQueueMetrics {
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100)); Resources.createResource(100*GB, 100));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3)); USER, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create() ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create()
@ -91,14 +113,14 @@ public class TestQueueMetrics {
.gaugeInt(PENDING_CONTAINERS, 5) .gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource); .checkAgainst(queueSource);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1) .gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true); USER, 3, Resources.createResource(2*GB, 2), true);
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB) .gaugeLong(ALLOCATED_MB, 6 * GB)
.gaugeInt(ALLOCATED_V_CORES, 6) .gaugeInt(ALLOCATED_V_CORES, 6)
@ -110,7 +132,7 @@ public class TestQueueMetrics {
.checkAgainst(queueSource); .checkAgainst(queueSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2)); USER, 1, Resources.createResource(2*GB, 2));
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4) .gaugeInt(ALLOCATED_V_CORES, 4)
@ -119,13 +141,13 @@ public class TestQueueMetrics {
.checkAgainst(queueSource); .checkAgainst(queueSource);
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 0, Resources.createResource(2 * GB, 2)); USER, 0, Resources.createResource(2 * GB, 2));
//nothing should change in values //nothing should change in values
rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker)
.checkAgainst(queueSource); .checkAgainst(queueSource);
metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL, metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 0, Resources.createResource(2 * GB, 2)); USER, 0, Resources.createResource(2 * GB, 2));
//nothing should change in values //nothing should change in values
ResourceMetricsChecker.createFromChecker(rmChecker) ResourceMetricsChecker.createFromChecker(rmChecker)
.checkAgainst(queueSource); .checkAgainst(queueSource);
@ -136,7 +158,7 @@ public class TestQueueMetrics {
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0) .gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
metrics.finishApp(user, RMAppState.FINISHED); metrics.finishApp(USER, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsChecker) AppMetricsChecker.createFromChecker(appMetricsChecker)
.counter(APPS_COMPLETED, 1) .counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
@ -146,24 +168,23 @@ public class TestQueueMetrics {
@Test @Test
public void testQueueAppMetricsForMultipleFailures() { public void testQueueAppMetricsForMultipleFailures() {
String queueName = "single"; String queueName = "single";
String user = "alice";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false, QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false,
new Configuration()); new Configuration());
MetricsSource queueSource = queueSource(ms, queueName); MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user); AppSchedulingInfo app = mockApp(USER);
metrics.submitApp(user); metrics.submitApp(USER);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, USER);
AppMetricsChecker appMetricsChecker = AppMetricsChecker.create() AppMetricsChecker appMetricsChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
metrics.submitAppAttempt(user); metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1) .gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1) .gaugeInt(APPS_RUNNING, 1)
@ -177,12 +198,12 @@ public class TestQueueMetrics {
// As the application has failed, framework retries the same application // As the application has failed, framework retries the same application
// based on configuration // based on configuration
metrics.submitAppAttempt(user); metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1) .gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1) .gaugeInt(APPS_RUNNING, 1)
@ -197,12 +218,12 @@ public class TestQueueMetrics {
// As the application has failed, framework retries the same application // As the application has failed, framework retries the same application
// based on configuration // based on configuration
metrics.submitAppAttempt(user); metrics.submitAppAttempt(USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 1) .gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1) .gaugeInt(APPS_RUNNING, 1)
@ -215,7 +236,7 @@ public class TestQueueMetrics {
.gaugeInt(APPS_RUNNING, 0) .gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
metrics.finishApp(user, RMAppState.FAILED); metrics.finishApp(USER, RMAppState.FAILED);
AppMetricsChecker.createFromChecker(appMetricsChecker) AppMetricsChecker.createFromChecker(appMetricsChecker)
.gaugeInt(APPS_RUNNING, 0) .gaugeInt(APPS_RUNNING, 0)
.counter(APPS_FAILED, 1) .counter(APPS_FAILED, 1)
@ -227,15 +248,14 @@ public class TestQueueMetrics {
@Test @Test
public void testSingleQueueWithUserMetrics() { public void testSingleQueueWithUserMetrics() {
String queueName = "single2"; String queueName = "single2";
String user = "dodo";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true, QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true,
conf); conf);
MetricsSource queueSource = queueSource(ms, queueName); MetricsSource queueSource = queueSource(ms, queueName);
AppSchedulingInfo app = mockApp(user); AppSchedulingInfo app = mockApp(USER_2);
metrics.submitApp(user); metrics.submitApp(USER_2);
MetricsSource userSource = userSource(ms, queueName, user); MetricsSource userSource = userSource(ms, queueName, USER_2);
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create() AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
@ -244,7 +264,7 @@ public class TestQueueMetrics {
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.checkAgainst(userSource, true); .checkAgainst(userSource, true);
metrics.submitAppAttempt(user); metrics.submitAppAttempt(USER_2);
appMetricsQueueSourceChecker = AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker) .createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1) .gaugeInt(APPS_PENDING, 1)
@ -257,9 +277,9 @@ public class TestQueueMetrics {
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100)); Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(10*GB, 10)); USER_2, Resources.createResource(10*GB, 10));
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3)); USER_2, 5, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
@ -280,7 +300,7 @@ public class TestQueueMetrics {
.gaugeInt(PENDING_CONTAINERS, 5) .gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(userSource); .checkAgainst(userSource);
metrics.runAppAttempt(app.getApplicationId(), user); metrics.runAppAttempt(app.getApplicationId(), USER_2);
appMetricsQueueSourceChecker = AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker) .createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_PENDING, 0)
@ -293,7 +313,7 @@ public class TestQueueMetrics {
.checkAgainst(userSource, true); .checkAgainst(userSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, metrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true); USER_2, 3, Resources.createResource(2*GB, 2), true);
resMetricsQueueSourceChecker = resMetricsQueueSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB) .gaugeLong(ALLOCATED_MB, 6 * GB)
@ -316,7 +336,7 @@ public class TestQueueMetrics {
.checkAgainst(userSource); .checkAgainst(userSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, metrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2)); USER_2, 1, Resources.createResource(2*GB, 2));
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4) .gaugeInt(ALLOCATED_V_CORES, 4)
@ -340,7 +360,7 @@ public class TestQueueMetrics {
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_RUNNING, 0) .gaugeInt(APPS_RUNNING, 0)
.checkAgainst(userSource, true); .checkAgainst(userSource, true);
metrics.finishApp(user, RMAppState.FINISHED); metrics.finishApp(USER_2, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_COMPLETED, 1) .counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true); .checkAgainst(queueSource, true);
@ -353,7 +373,6 @@ public class TestQueueMetrics {
public void testNodeTypeMetrics() { public void testNodeTypeMetrics() {
String parentQueueName = "root"; String parentQueueName = "root";
String leafQueueName = "root.leaf"; String leafQueueName = "root.leaf";
String user = "alice";
QueueMetrics parentMetrics = QueueMetrics parentMetrics =
QueueMetrics.forQueue(ms, parentQueueName, null, true, conf); QueueMetrics.forQueue(ms, parentQueueName, null, true, conf);
@ -365,29 +384,29 @@ public class TestQueueMetrics {
MetricsSource queueSource = queueSource(ms, leafQueueName); MetricsSource queueSource = queueSource(ms, leafQueueName);
//AppSchedulingInfo app = mockApp(user); //AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user); metrics.submitApp(USER);
MetricsSource userSource = userSource(ms, leafQueueName, user); MetricsSource userSource = userSource(ms, leafQueueName, USER);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user); MetricsSource parentUserSource = userSource(ms, parentQueueName, USER);
metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL); metrics.incrNodeTypeAggregations(USER, NodeType.NODE_LOCAL);
checkAggregatedNodeTypes(queueSource, 1L, 0L, 0L); checkAggregatedNodeTypes(queueSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 0L, 0L); checkAggregatedNodeTypes(parentQueueSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(userSource, 1L, 0L, 0L); checkAggregatedNodeTypes(userSource, 1L, 0L, 0L);
checkAggregatedNodeTypes(parentUserSource, 1L, 0L, 0L); checkAggregatedNodeTypes(parentUserSource, 1L, 0L, 0L);
metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL); metrics.incrNodeTypeAggregations(USER, NodeType.RACK_LOCAL);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 0L); checkAggregatedNodeTypes(queueSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 0L); checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 0L); checkAggregatedNodeTypes(userSource, 1L, 1L, 0L);
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 0L); checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 0L);
metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 1L); checkAggregatedNodeTypes(queueSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 1L); checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 1L); checkAggregatedNodeTypes(userSource, 1L, 1L, 1L);
checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 1L); checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 1L);
metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH);
checkAggregatedNodeTypes(queueSource, 1L, 1L, 2L); checkAggregatedNodeTypes(queueSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L); checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L);
checkAggregatedNodeTypes(userSource, 1L, 1L, 2L); checkAggregatedNodeTypes(userSource, 1L, 1L, 2L);
@ -396,67 +415,60 @@ public class TestQueueMetrics {
@Test @Test
public void testTwoLevelWithUserMetrics() { public void testTwoLevelWithUserMetrics() {
String parentQueueName = "root"; AppSchedulingInfo app = mockApp(USER);
String leafQueueName = "root.leaf";
String user = "alice";
QueueMetrics parentMetrics = QueueInfo root = new QueueInfo(null, "root", ms, conf, USER);
QueueMetrics.forQueue(ms, parentQueueName, null, true, conf); QueueInfo leaf = new QueueInfo(root, "root.leaf", ms, conf, USER);
Queue parentQueue = mock(Queue.class); leaf.queueMetrics.submitApp(USER);
when(parentQueue.getMetrics()).thenReturn(parentMetrics);
QueueMetrics metrics =
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf);
MetricsSource parentQueueSource = queueSource(ms, parentQueueName);
MetricsSource queueSource = queueSource(ms, leafQueueName);
AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user);
MetricsSource userSource = userSource(ms, leafQueueName, user);
MetricsSource parentUserSource = userSource(ms, parentQueueName, user);
AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create() AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.checkAgainst(queueSource, true); .checkAgainst(leaf.queueSource, true);
AppMetricsChecker appMetricsParentQueueSourceChecker = AppMetricsChecker appMetricsParentQueueSourceChecker =
AppMetricsChecker.create() AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.checkAgainst(parentQueueSource, true); .checkAgainst(root.queueSource, true);
AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create() AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.checkAgainst(userSource, true); .checkAgainst(leaf.userSource, true);
AppMetricsChecker appMetricsParentUserSourceChecker = AppMetricsChecker appMetricsParentUserSourceChecker =
AppMetricsChecker.create() AppMetricsChecker.create()
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.checkAgainst(parentUserSource, true); .checkAgainst(root.userSource, true);
metrics.submitAppAttempt(user); leaf.queueMetrics.submitAppAttempt(USER);
appMetricsQueueSourceChecker = appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1) .gaugeInt(APPS_PENDING, 1)
.checkAgainst(queueSource, true); .checkAgainst(leaf.queueSource, true);
appMetricsParentQueueSourceChecker = appMetricsParentQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker) AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
.gaugeInt(APPS_PENDING, 1) .gaugeInt(APPS_PENDING, 1)
.checkAgainst(parentQueueSource, true); .checkAgainst(root.queueSource, true);
appMetricsUserSourceChecker = appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 1) .gaugeInt(APPS_PENDING, 1)
.checkAgainst(userSource, true); .checkAgainst(leaf.userSource, true);
appMetricsParentUserSourceChecker = appMetricsParentUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker) AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
.gaugeInt(APPS_PENDING, 1) .gaugeInt(APPS_PENDING, 1)
.checkAgainst(parentUserSource, true); .checkAgainst(root.userSource, true);
parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, root.queueMetrics.setAvailableResourcesToQueue(
RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100)); Resources.createResource(100*GB, 100));
metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, leaf.queueMetrics.setAvailableResourcesToQueue(
RMNodeLabelsManager.NO_LABEL,
Resources.createResource(100*GB, 100)); Resources.createResource(100*GB, 100));
parentMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, root.queueMetrics.setAvailableResourcesToUser(
user, Resources.createResource(10*GB, 10)); RMNodeLabelsManager.NO_LABEL,
metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, USER, Resources.createResource(10*GB, 10));
user, Resources.createResource(10*GB, 10)); leaf.queueMetrics.setAvailableResourcesToUser(
metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, RMNodeLabelsManager.NO_LABEL,
user, 5, Resources.createResource(3*GB, 3)); USER, Resources.createResource(10*GB, 10));
leaf.queueMetrics.incrPendingResources(
RMNodeLabelsManager.NO_LABEL,
USER, 5, Resources.createResource(3*GB, 3));
ResourceMetricsChecker resMetricsQueueSourceChecker = ResourceMetricsChecker resMetricsQueueSourceChecker =
ResourceMetricsChecker.create() ResourceMetricsChecker.create()
@ -465,7 +477,7 @@ public class TestQueueMetrics {
.gaugeLong(PENDING_MB, 15 * GB) .gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15) .gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5) .gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(queueSource); .checkAgainst(leaf.queueSource);
ResourceMetricsChecker resMetricsParentQueueSourceChecker = ResourceMetricsChecker resMetricsParentQueueSourceChecker =
ResourceMetricsChecker.create() ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 100 * GB) .gaugeLong(AVAILABLE_MB, 100 * GB)
@ -473,7 +485,7 @@ public class TestQueueMetrics {
.gaugeLong(PENDING_MB, 15 * GB) .gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15) .gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5) .gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(parentQueueSource); .checkAgainst(root.queueSource);
ResourceMetricsChecker resMetricsUserSourceChecker = ResourceMetricsChecker resMetricsUserSourceChecker =
ResourceMetricsChecker.create() ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB) .gaugeLong(AVAILABLE_MB, 10 * GB)
@ -481,7 +493,7 @@ public class TestQueueMetrics {
.gaugeLong(PENDING_MB, 15 * GB) .gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15) .gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5) .gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(userSource); .checkAgainst(leaf.userSource);
ResourceMetricsChecker resMetricsParentUserSourceChecker = ResourceMetricsChecker resMetricsParentUserSourceChecker =
ResourceMetricsChecker.create() ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, 10 * GB) .gaugeLong(AVAILABLE_MB, 10 * GB)
@ -489,24 +501,24 @@ public class TestQueueMetrics {
.gaugeLong(PENDING_MB, 15 * GB) .gaugeLong(PENDING_MB, 15 * GB)
.gaugeInt(PENDING_V_CORES, 15) .gaugeInt(PENDING_V_CORES, 15)
.gaugeInt(PENDING_CONTAINERS, 5) .gaugeInt(PENDING_CONTAINERS, 5)
.checkAgainst(parentUserSource); .checkAgainst(root.userSource);
metrics.runAppAttempt(app.getApplicationId(), user); leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER);
appMetricsQueueSourceChecker = appMetricsQueueSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1) .gaugeInt(APPS_RUNNING, 1)
.checkAgainst(queueSource, true); .checkAgainst(leaf.queueSource, true);
appMetricsUserSourceChecker = appMetricsUserSourceChecker =
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 1) .gaugeInt(APPS_RUNNING, 1)
.checkAgainst(userSource, true); .checkAgainst(leaf.userSource, true);
metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, leaf.queueMetrics.allocateResources(RMNodeLabelsManager.NO_LABEL,
user, 3, Resources.createResource(2*GB, 2), true); USER, 3, Resources.createResource(2*GB, 2), true);
metrics.reserveResource(RMNodeLabelsManager.NO_LABEL, leaf.queueMetrics.reserveResource(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(3*GB, 3)); USER, Resources.createResource(3*GB, 3));
// Available resources is set externally, as it depends on dynamic // Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources // configurable cluster/queue resources
resMetricsQueueSourceChecker = resMetricsQueueSourceChecker =
@ -521,7 +533,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 3 * GB) .gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3) .gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1) .gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(queueSource); .checkAgainst(leaf.queueSource);
resMetricsParentQueueSourceChecker = resMetricsParentQueueSourceChecker =
ResourceMetricsChecker ResourceMetricsChecker
.createFromChecker(resMetricsParentQueueSourceChecker) .createFromChecker(resMetricsParentQueueSourceChecker)
@ -535,7 +547,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 3 * GB) .gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3) .gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1) .gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(parentQueueSource); .checkAgainst(root.queueSource);
resMetricsUserSourceChecker = resMetricsUserSourceChecker =
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB) .gaugeLong(ALLOCATED_MB, 6 * GB)
@ -548,7 +560,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 3 * GB) .gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3) .gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1) .gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(userSource); .checkAgainst(leaf.userSource);
resMetricsParentUserSourceChecker = ResourceMetricsChecker resMetricsParentUserSourceChecker = ResourceMetricsChecker
.createFromChecker(resMetricsParentUserSourceChecker) .createFromChecker(resMetricsParentUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 6 * GB) .gaugeLong(ALLOCATED_MB, 6 * GB)
@ -561,12 +573,12 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 3 * GB) .gaugeLong(RESERVED_MB, 3 * GB)
.gaugeInt(RESERVED_V_CORES, 3) .gaugeInt(RESERVED_V_CORES, 3)
.gaugeInt(RESERVED_CONTAINERS, 1) .gaugeInt(RESERVED_CONTAINERS, 1)
.checkAgainst(parentUserSource); .checkAgainst(root.userSource);
metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, leaf.queueMetrics.releaseResources(RMNodeLabelsManager.NO_LABEL,
user, 1, Resources.createResource(2*GB, 2)); USER, 1, Resources.createResource(2*GB, 2));
metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL, leaf.queueMetrics.unreserveResource(RMNodeLabelsManager.NO_LABEL,
user, Resources.createResource(3*GB, 3)); USER, Resources.createResource(3*GB, 3));
ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4) .gaugeInt(ALLOCATED_V_CORES, 4)
@ -575,7 +587,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 0) .gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0) .gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0) .gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(queueSource); .checkAgainst(leaf.queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker) ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4) .gaugeInt(ALLOCATED_V_CORES, 4)
@ -584,7 +596,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 0) .gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0) .gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0) .gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(parentQueueSource); .checkAgainst(root.queueSource);
ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4) .gaugeInt(ALLOCATED_V_CORES, 4)
@ -593,7 +605,7 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 0) .gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0) .gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0) .gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(userSource); .checkAgainst(leaf.userSource);
ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker) ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker)
.gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeLong(ALLOCATED_MB, 4 * GB)
.gaugeInt(ALLOCATED_V_CORES, 4) .gaugeInt(ALLOCATED_V_CORES, 4)
@ -602,46 +614,46 @@ public class TestQueueMetrics {
.gaugeLong(RESERVED_MB, 0) .gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0) .gaugeInt(RESERVED_V_CORES, 0)
.gaugeInt(RESERVED_CONTAINERS, 0) .gaugeInt(RESERVED_CONTAINERS, 0)
.checkAgainst(parentUserSource); .checkAgainst(root.userSource);
metrics.finishAppAttempt( leaf.queueMetrics.finishAppAttempt(
app.getApplicationId(), app.isPending(), app.getUser()); app.getApplicationId(), app.isPending(), app.getUser());
appMetricsQueueSourceChecker = AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsQueueSourceChecker) .createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0) .gaugeInt(APPS_RUNNING, 0)
.checkAgainst(queueSource, true); .checkAgainst(leaf.queueSource, true);
appMetricsParentQueueSourceChecker = AppMetricsChecker appMetricsParentQueueSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsParentQueueSourceChecker) .createFromChecker(appMetricsParentQueueSourceChecker)
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0) .gaugeInt(APPS_RUNNING, 0)
.checkAgainst(parentQueueSource, true); .checkAgainst(root.queueSource, true);
appMetricsUserSourceChecker = AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsUserSourceChecker) .createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_RUNNING, 0) .gaugeInt(APPS_RUNNING, 0)
.checkAgainst(userSource, true); .checkAgainst(leaf.userSource, true);
appMetricsParentUserSourceChecker = AppMetricsChecker appMetricsParentUserSourceChecker = AppMetricsChecker
.createFromChecker(appMetricsParentUserSourceChecker) .createFromChecker(appMetricsParentUserSourceChecker)
.counter(APPS_SUBMITTED, 1) .counter(APPS_SUBMITTED, 1)
.gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_PENDING, 0)
.gaugeInt(APPS_RUNNING, 0) .gaugeInt(APPS_RUNNING, 0)
.checkAgainst(parentUserSource, true); .checkAgainst(root.userSource, true);
metrics.finishApp(user, RMAppState.FINISHED); leaf.queueMetrics.finishApp(USER, RMAppState.FINISHED);
AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker)
.counter(APPS_COMPLETED, 1) .counter(APPS_COMPLETED, 1)
.checkAgainst(queueSource, true); .checkAgainst(leaf.queueSource, true);
AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker) AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker)
.counter(APPS_COMPLETED, 1) .counter(APPS_COMPLETED, 1)
.checkAgainst(parentQueueSource, true); .checkAgainst(root.queueSource, true);
AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker)
.counter(APPS_COMPLETED, 1) .counter(APPS_COMPLETED, 1)
.checkAgainst(userSource, true); .checkAgainst(leaf.userSource, true);
AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker) AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker)
.counter(APPS_COMPLETED, 1) .counter(APPS_COMPLETED, 1)
.checkAgainst(parentUserSource, true); .checkAgainst(root.userSource, true);
} }
@Test @Test
@ -719,7 +731,7 @@ public class TestQueueMetrics {
assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb); assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb);
} }
private static AppSchedulingInfo mockApp(String user) { static AppSchedulingInfo mockApp(String user) {
AppSchedulingInfo app = mock(AppSchedulingInfo.class); AppSchedulingInfo app = mock(AppSchedulingInfo.class);
when(app.getUser()).thenReturn(user); when(app.getUser()).thenReturn(user);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1); ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
@ -732,7 +744,7 @@ public class TestQueueMetrics {
return ms.getSource(QueueMetrics.sourceName(queue).toString()); return ms.getSource(QueueMetrics.sourceName(queue).toString());
} }
private static MetricsSource userSource(MetricsSystem ms, String queue, public static MetricsSource userSource(MetricsSystem ms, String queue,
String user) { String user) {
return ms.getSource(QueueMetrics.sourceName(queue). return ms.getSource(QueueMetrics.sourceName(queue).
append(",user=").append(user).toString()); append(",user=").append(user).toString());

View File

@ -0,0 +1,781 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.QueueMetricsForCustomResources.QueueMetricsCustomResource;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES;
import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper
.extractCustomResourcesAsStrings;
import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper.newResource;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_VCORE_SECONDS_PREEMPTED;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestQueueMetricsForCustomResources {
public enum MetricsForCustomResource {
ALLOCATED, AVAILABLE, PENDING, RESERVED, AGGREGATE_PREEMPTED_SECONDS
}
public static final long GB = 1024; // MB
private static final Configuration CONF = new Configuration();
private static final String CUSTOM_RES_1 = "custom_res_1";
private static final String CUSTOM_RES_2 = "custom_res_2";
public static final String USER = "alice";
private Resource defaultResource;
private MetricsSystem ms;
@Before
public void setUp() {
ms = new MetricsSystemImpl();
QueueMetrics.clearQueueMetrics();
initializeResourceTypes();
createDefaultResource();
}
private void createDefaultResource() {
defaultResource = newResource(4 * GB, 4,
ImmutableMap.<String, String> builder()
.put(CUSTOM_RES_1, String.valueOf(15 * GB))
.put(CUSTOM_RES_2, String.valueOf(20 * GB))
.build());
}
private void initializeResourceTypes() {
Map<String, ResourceInformation> riMap = new HashMap<>();
ResourceInformation memory = ResourceInformation.newInstance(
ResourceInformation.MEMORY_MB.getName(),
ResourceInformation.MEMORY_MB.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
ResourceInformation vcores = ResourceInformation.newInstance(
ResourceInformation.VCORES.getName(),
ResourceInformation.VCORES.getUnits(),
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES_1,
ResourceInformation.VCORES.getUnits(), 0, 2000);
ResourceInformation res2 = ResourceInformation.newInstance(CUSTOM_RES_2,
ResourceInformation.VCORES.getUnits(), 0, 2000);
riMap.put(ResourceInformation.MEMORY_URI, memory);
riMap.put(ResourceInformation.VCORES_URI, vcores);
riMap.put(CUSTOM_RES_1, res1);
riMap.put(CUSTOM_RES_2, res2);
ResourceUtils.initializeResourcesFromResourceInformationMap(riMap);
}
private static void assertCustomResourceValue(
MetricsForCustomResource metricsType,
Resource res,
String resourceName,
long expectedValue) {
Long value = res.getResourceValue(resourceName);
assertCustomResourceValueInternal(metricsType, resourceName,
expectedValue, value);
}
private static void assertCustomResourceValueInternal(
MetricsForCustomResource metricsType, String resourceName, long
expectedValue, Long value) {
assertNotNull(
"QueueMetrics should have custom resource metrics value " +
"for resource: " + resourceName, value);
assertEquals(String.format(
"QueueMetrics should have custom resource metrics value %d " +
"for resource: %s for metrics type %s",
expectedValue, resourceName, metricsType), expectedValue,
(long) value);
}
private static Map<String, String> getCustomResourcesWithValue(long value) {
return ImmutableMap.<String, String>builder()
.put(CUSTOM_RES_1, String.valueOf(value))
.put(CUSTOM_RES_2, String.valueOf(value))
.build();
}
private QueueInfo createFourLevelQueueHierarchy() {
QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
QueueInfo sub = new QueueInfo(root, "root.subQ", ms, CONF, USER);
QueueInfo sub2 = new QueueInfo(sub, "root.subQ2", ms, CONF, USER);
return new QueueInfo(sub2, "root.subQ2.leafQ", ms, CONF, USER);
}
private QueueInfo createBasicQueueHierarchy() {
QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER);
return new QueueInfo(root, "root.leaf", ms, CONF, USER);
}
private QueueMetricsTestData.Builder
createQueueMetricsTestDataWithContainers(int containers) {
return createDefaultQueueMetricsTestData()
.withContainers(containers);
}
private QueueMetricsTestData.Builder createDefaultQueueMetricsTestData() {
return QueueMetricsTestData.Builder.create()
.withUser(USER)
.withPartition(RMNodeLabelsManager.NO_LABEL);
}
private void testIncreasePendingResources(QueueMetricsTestData testData) {
testIncreasePendingResourcesInternal(testData.containers, testData);
}
private void testIncreasePendingResourcesWithoutContainer(
QueueMetricsTestData testData) {
testIncreasePendingResourcesInternal(1, testData);
}
private void testIncreasePendingResourcesInternal(int containers,
QueueMetricsTestData testData) {
testData.leafQueue.queueMetrics.incrPendingResources(testData.partition,
testData.user, containers, testData.resource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(PENDING_CONTAINERS, containers)
.gaugeLong(PENDING_MB, containers *
testData.resource.getMemorySize())
.gaugeInt(PENDING_V_CORES, containers *
testData.resource.getVirtualCores());
Map<String, Long> expected = new HashMap<>();
for (Map.Entry<String, Long> entry : testData.customResourceValues.entrySet()) {
expected.put(entry.getKey(), entry.getValue() * containers);
}
assertAllPendingMetrics(testData.leafQueue, checker,
MetricsForCustomResource.PENDING, expected);
}
private void testAllocateResources(boolean decreasePending,
QueueMetricsTestData testData) {
testData.leafQueue.queueMetrics.allocateResources(testData.partition,
testData.user, testData.containers, testData.resource, decreasePending);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(ALLOCATED_CONTAINERS, testData.containers)
.counter(AGGREGATE_CONTAINERS_ALLOCATED, testData.containers)
.gaugeLong(ALLOCATED_MB, testData.containers *
testData.resource.getMemorySize())
.gaugeInt(ALLOCATED_V_CORES, testData.containers *
testData.resource.getVirtualCores())
.gaugeInt(PENDING_CONTAINERS, 0)
.gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0)
.checkAgainst(testData.leafQueue.queueSource);
if (decreasePending) {
Map<String, Long> expected = new HashMap<>();
for (Map.Entry<String, Long> entry : testData.customResourceValues.entrySet()) {
expected.put(entry.getKey(), 0L);
}
assertAllPendingMetrics(testData.leafQueue, checker,
MetricsForCustomResource.PENDING, expected);
}
if (!testData.customResourceValues.isEmpty()) {
Map<String, Long> expected = new HashMap<>();
for (Map.Entry<String, Long> entry : testData.customResourceValues.entrySet()) {
expected.put(entry.getKey(), entry.getValue() * testData.containers);
}
assertAllAllocatedMetrics(testData.leafQueue, checker,
MetricsForCustomResource.ALLOCATED, expected);
}
}
private void testUpdatePreemptedSeconds(QueueMetricsTestData testData,
int seconds) {
testData.leafQueue.queueMetrics.updatePreemptedMemoryMBSeconds(
testData.resource.getMemorySize() * seconds);
testData.leafQueue.queueMetrics.updatePreemptedVcoreSeconds(
testData.resource.getVirtualCores() * seconds);
testData.leafQueue.queueMetrics.updatePreemptedSecondsForCustomResources(
testData.resource, seconds);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED,
testData.resource.getMemorySize() * seconds)
.counter(AGGREGATE_VCORE_SECONDS_PREEMPTED,
testData.resource.getVirtualCores() * seconds);
Map<String, Long> expected = new HashMap<>();
for (Map.Entry<String, Long> entry : testData.customResourceValues.entrySet()) {
expected.put(entry.getKey(), entry.getValue() * seconds);
}
assertQueuePreemptedMetricsOnly(testData.leafQueue, checker,
MetricsForCustomResource.AGGREGATE_PREEMPTED_SECONDS, expected);
}
private Resource convertPreemptedSecondsToResource(QueueMetrics qm) {
QueueMetricsCustomResource customValues = qm
.getAggregatedPreemptedSecondsResources();
MutableCounterLong vcoreSeconds = qm
.getAggregateVcoreSecondsPreempted();
MutableCounterLong memorySeconds = qm
.getAggregateMemoryMBSecondsPreempted();
return Resource.newInstance(
memorySeconds.value(), (int) vcoreSeconds.value(),
customValues.getValues());
}
private void testReserveResources(QueueMetricsTestData testData) {
testData.leafQueue.queueMetrics.reserveResource(testData.partition,
testData.user, testData.resource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(RESERVED_CONTAINERS, 1)
.gaugeLong(RESERVED_MB, testData.resource.getMemorySize())
.gaugeInt(RESERVED_V_CORES, testData.resource.getVirtualCores())
.checkAgainst(testData.leafQueue.queueSource);
assertAllReservedMetrics(testData.leafQueue, checker,
MetricsForCustomResource.RESERVED, testData.customResourceValues);
}
private void testGetAllocatedResources(QueueMetricsTestData testData) {
testAllocateResources(false, testData);
Resource res = testData.leafQueue.queueMetrics.getAllocatedResources();
if (testData.customResourceValues.size() > 0) {
assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
CUSTOM_RES_1,
testData.customResourceValues.get(CUSTOM_RES_1) * testData.containers,
res.getResourceValue(CUSTOM_RES_1));
assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED,
CUSTOM_RES_2,
testData.customResourceValues.get(CUSTOM_RES_2) * testData.containers,
res.getResourceValue(CUSTOM_RES_2));
}
}
private void assertAllPendingMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllPendingQueueMetrics(queueInfo, checker, metricsType,
expectedCustomResourceValues);
//assert leaf and root userSources
checker = ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.userSource);
ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.getRoot().userSource);
}
private void assertQueuePendingMetricsOnly(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllPendingQueueMetrics(queueInfo, checker, metricsType,
expectedCustomResourceValues);
}
private void assertAllPendingQueueMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
// assert normal resource metrics values
queueInfo.checkAllQueueSources(checker);
// assert custom resource metrics values
checkAllPendingQueueMetrics(queueInfo, metricsType, CUSTOM_RES_1,
expectedCustomResourceValues.get(CUSTOM_RES_1));
checkAllPendingQueueMetrics(queueInfo, metricsType, CUSTOM_RES_2,
expectedCustomResourceValues.get(CUSTOM_RES_2));
}
private void checkAllPendingQueueMetrics(QueueInfo queueInfo,
MetricsForCustomResource metricsType, String resName, long resValue) {
assertCustomResourceValue(metricsType,
queueInfo.queueMetrics.getPendingResources(), resName, resValue);
if (queueInfo.getParentQueueInfo() != null) {
checkAllPendingQueueMetrics(queueInfo.getParentQueueInfo(), metricsType,
resName, resValue);
}
}
private void assertAllAllocatedMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllAllocatedQueueMetrics(queueInfo, checker, metricsType,
expectedCustomResourceValues);
//assert leaf and root userSources
checker = ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.userSource);
ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.getRoot().userSource);
}
private void assertQueueAllocatedMetricsOnly(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllAllocatedQueueMetrics(queueInfo, checker, metricsType,
expectedCustomResourceValues);
}
private void assertAllAllocatedQueueMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
// assert normal resource metrics values
queueInfo.checkAllQueueSources(checker);
// assert custom resource metrics values
checkAllAllocatedQueueMetrics(queueInfo, metricsType, CUSTOM_RES_1,
expectedCustomResourceValues.get(CUSTOM_RES_1));
checkAllAllocatedQueueMetrics(queueInfo, metricsType, CUSTOM_RES_2,
expectedCustomResourceValues.get(CUSTOM_RES_2));
}
private void checkAllAllocatedQueueMetrics(QueueInfo queueInfo,
MetricsForCustomResource metricsType, String resName, long resValue) {
assertCustomResourceValue(metricsType,
queueInfo.queueMetrics.getAllocatedResources(), resName, resValue);
if (queueInfo.getParentQueueInfo() != null) {
checkAllAllocatedQueueMetrics(queueInfo.getParentQueueInfo(), metricsType,
resName, resValue);
}
}
private void assertAllPreemptedMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllPreemptedQueueMetrics(queueInfo, checker, metricsType,
expectedCustomResourceValues);
//assert leaf and root userSources
checker = ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.userSource);
ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.getRoot().userSource);
}
private void assertQueuePreemptedMetricsOnly(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllPreemptedQueueMetrics(queueInfo, checker, metricsType,
expectedCustomResourceValues);
}
private void assertAllPreemptedQueueMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
// assert normal resource metrics values
queueInfo.checkAllQueueSources(checker);
// assert custom resource metrics values
checkAllPreemptedQueueMetrics(queueInfo, metricsType, CUSTOM_RES_1,
expectedCustomResourceValues.get(CUSTOM_RES_1));
checkAllPreemptedQueueMetrics(queueInfo, metricsType, CUSTOM_RES_2,
expectedCustomResourceValues.get(CUSTOM_RES_2));
}
private void checkAllPreemptedQueueMetrics(QueueInfo queueInfo,
MetricsForCustomResource metricsType, String resName, long resValue) {
assertCustomResourceValue(metricsType,
convertPreemptedSecondsToResource(queueInfo.queueMetrics), resName, resValue);
if (queueInfo.getParentQueueInfo() != null) {
checkAllPreemptedQueueMetrics(queueInfo.getParentQueueInfo(), metricsType,
resName, resValue);
}
}
private void assertAllReservedMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllReservedQueueMetrics(queueInfo, checker, metricsType,
expectedCustomResourceValues);
//assert leaf and root userSources
checker = ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.userSource);
ResourceMetricsChecker.createFromChecker(checker)
.checkAgainst(queueInfo.getRoot().userSource);
}
private void assertQueueReservedMetricsOnly(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
assertAllReservedQueueMetrics(queueInfo, checker, metricsType,
expectedCustomResourceValues);
}
private void assertAllReservedQueueMetrics(QueueInfo queueInfo,
ResourceMetricsChecker checker,
MetricsForCustomResource metricsType,
Map<String, Long> expectedCustomResourceValues) {
// assert normal resource metrics values
queueInfo.checkAllQueueSources(checker);
// assert custom resource metrics values
checkAllReservedQueueMetrics(queueInfo, metricsType, CUSTOM_RES_1,
expectedCustomResourceValues.get(CUSTOM_RES_1));
checkAllReservedQueueMetrics(queueInfo, metricsType, CUSTOM_RES_2,
expectedCustomResourceValues.get(CUSTOM_RES_2));
}
private void checkAllReservedQueueMetrics(QueueInfo queueInfo,
MetricsForCustomResource metricsType, String resName, long resValue) {
assertCustomResourceValue(metricsType,
queueInfo.queueMetrics.getReservedResources(), resName, resValue);
if (queueInfo.getParentQueueInfo() != null) {
checkAllReservedQueueMetrics(queueInfo.getParentQueueInfo(), metricsType,
resName, resValue);
}
}
@Test
public void testSetAvailableResourcesToQueue1() {
String queueName = "single";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
false, CONF);
MetricsSource queueSource = queueSource(ms, queueName);
metrics.setAvailableResourcesToQueue(newResource(
GB, 4,
ImmutableMap.<String, String> builder()
.put(CUSTOM_RES_1, String.valueOf(5 * GB))
.put(CUSTOM_RES_2, String.valueOf(6 * GB))
.build()));
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, GB)
.gaugeInt(AVAILABLE_V_CORES, 4)
.checkAgainst(queueSource);
assertCustomResourceValue(MetricsForCustomResource.AVAILABLE,
metrics.getAvailableResources(), CUSTOM_RES_1, 5 * GB);
assertCustomResourceValue(MetricsForCustomResource.AVAILABLE,
metrics.getAvailableResources(), CUSTOM_RES_2, 6 * GB);
}
@Test
public void testSetAvailableResourcesToQueue2() {
String queueName = "single";
QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null,
false, CONF);
MetricsSource queueSource = queueSource(ms, queueName);
metrics.setAvailableResourcesToQueue(null,
newResource(GB, 4,
ImmutableMap.<String, String> builder()
.put(CUSTOM_RES_1, String.valueOf(15 * GB))
.put(CUSTOM_RES_2, String.valueOf(20 * GB))
.build()));
ResourceMetricsChecker.create()
.gaugeLong(AVAILABLE_MB, GB)
.gaugeInt(AVAILABLE_V_CORES, 4)
.checkAgainst(queueSource);
assertCustomResourceValue(MetricsForCustomResource.AVAILABLE,
metrics.getAvailableResources(), CUSTOM_RES_1, 15 * GB);
assertCustomResourceValue(MetricsForCustomResource.AVAILABLE,
metrics.getAvailableResources(), CUSTOM_RES_2, 20 * GB);
}
@Test
public void testIncreasePendingResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(
newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
.withResources(defaultResource)
.build();
testIncreasePendingResources(testData);
}
@Test
public void testDecreasePendingResources() {
Resource resourceToDecrease =
newResource(GB, 2, getCustomResourcesWithValue(2 * GB));
int containersToDecrease = 2;
int containers = 5;
QueueMetricsTestData testData =
createQueueMetricsTestDataWithContainers(containers)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(resourceToDecrease, containers)
.withResources(defaultResource)
.build();
//compute expected values
final int vCoresToDecrease = resourceToDecrease.getVirtualCores();
final long memoryMBToDecrease = resourceToDecrease.getMemorySize();
final int containersAfterDecrease = containers - containersToDecrease;
final int vcoresAfterDecrease =
(defaultResource.getVirtualCores() * containers)
- (vCoresToDecrease * containersToDecrease);
final long memoryAfterDecrease =
(defaultResource.getMemorySize() * containers)
- (memoryMBToDecrease * containersToDecrease);
//first, increase resources to be able to decrease some
testIncreasePendingResources(testData);
//decrease resources
testData.leafQueue.queueMetrics.decrPendingResources(testData.partition,
testData.user, containersToDecrease,
ResourceTypesTestHelper.newResource(memoryMBToDecrease,
vCoresToDecrease,
extractCustomResourcesAsStrings(resourceToDecrease)));
//check
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(PENDING_CONTAINERS, containersAfterDecrease)
.gaugeLong(PENDING_MB, memoryAfterDecrease)
.gaugeInt(PENDING_V_CORES, vcoresAfterDecrease)
.checkAgainst(testData.leafQueue.queueSource);
Map<String, Long> expected = new HashMap<>();
for (Map.Entry<String, Long> entry : testData.customResourceValues.entrySet()) {
expected.put(entry.getKey(), entry.getValue() * containers
- (resourceToDecrease.getResourceValue(entry.getKey()) * containersToDecrease));
}
assertAllPendingMetrics(testData.leafQueue, checker,
MetricsForCustomResource.PENDING, expected);
}
@Test
public void testAllocateResourcesWithoutDecreasePending() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testAllocateResources(false, testData);
}
@Test
public void testAllocateResourcesWithDecreasePending() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(
newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2)
.withResources(defaultResource)
.build();
//first, increase pending resources to be able to decrease some
testIncreasePendingResources(testData);
//then allocate with decrease pending resources
testAllocateResources(true, testData);
}
@Test
public void testAllocateResourcesWithoutContainer() {
QueueMetricsTestData testData = createDefaultQueueMetricsTestData()
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
//first, increase pending resources
testIncreasePendingResourcesWithoutContainer(testData);
Resource resource = testData.resource;
testData.leafQueue.queueMetrics.allocateResources(testData.partition,
testData.user, resource);
ResourceMetricsChecker checker = ResourceMetricsChecker.create()
.gaugeLong(ALLOCATED_MB, resource.getMemorySize())
.gaugeInt(ALLOCATED_V_CORES, resource.getVirtualCores())
.gaugeInt(PENDING_CONTAINERS, 1).gaugeLong(PENDING_MB, 0)
.gaugeInt(PENDING_V_CORES, 0);
checker.checkAgainst(testData.leafQueue.queueSource);
checker.checkAgainst(testData.leafQueue.getRoot().queueSource);
Map<String, Long> expected = new HashMap<>();
for (Map.Entry<String, Long> entry : testData.customResourceValues.entrySet()) {
expected.put(entry.getKey(), 0L);
}
assertAllPendingMetrics(testData.leafQueue, checker,
MetricsForCustomResource.PENDING, expected);
expected.clear();
for (Map.Entry<String, Long> entry : testData.customResourceValues.entrySet()) {
expected.put(entry.getKey(), entry.getValue());
}
assertAllAllocatedMetrics(testData.leafQueue, checker,
MetricsForCustomResource.ALLOCATED, expected);
}
@Test
public void testReleaseResources() {
int containers = 5;
QueueMetricsTestData testData =
createQueueMetricsTestDataWithContainers(containers)
.withLeafQueue(createBasicQueueHierarchy())
.withResourceToDecrease(defaultResource, containers)
.withResources(defaultResource)
.build();
//first, allocate some resources so that we can release some
testAllocateResources(false, testData);
testData.leafQueue.queueMetrics.releaseResources(testData.partition,
testData.user, containers, defaultResource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.counter(AGGREGATE_CONTAINERS_ALLOCATED, containers)
.counter(AGGREGATE_CONTAINERS_RELEASED, containers)
.checkAgainst(testData.leafQueue.queueSource);
Map<String, Long> expected = new HashMap<>();
for (Map.Entry<String, Long> entry : testData.customResourceValues.entrySet()) {
expected.put(entry.getKey(), 0L);
}
assertAllAllocatedMetrics(testData.leafQueue, checker,
MetricsForCustomResource.ALLOCATED, expected);
}
@Test
public void testUpdatePreemptedSecondsForCustomResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createFourLevelQueueHierarchy())
.withResources(defaultResource)
.build();
final int seconds = 1;
testUpdatePreemptedSeconds(testData, seconds);
}
@Test
public void testUpdatePreemptedSecondsForCustomResourcesMoreSeconds() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createFourLevelQueueHierarchy())
.withResources(defaultResource)
.build();
final int seconds = 15;
testUpdatePreemptedSeconds(testData, seconds);
}
@Test
public void testReserveResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testReserveResources(testData);
}
@Test
public void testUnreserveResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testReserveResources(testData);
testData.leafQueue.queueMetrics.unreserveResource(testData.partition,
testData.user, defaultResource);
ResourceMetricsChecker checker = ResourceMetricsChecker
.create()
.gaugeInt(RESERVED_CONTAINERS, 0)
.gaugeLong(RESERVED_MB, 0)
.gaugeInt(RESERVED_V_CORES, 0)
.checkAgainst(testData.leafQueue.queueSource);
Map<String, Long> expected = new HashMap<>();
for (Map.Entry<String, Long> entry : testData.customResourceValues.entrySet()) {
expected.put(entry.getKey(), 0L);
}
assertAllReservedMetrics(testData.leafQueue, checker,
MetricsForCustomResource.RESERVED, expected);
}
@Test
public void testGetAllocatedResourcesWithCustomResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withLeafQueue(createBasicQueueHierarchy())
.withResources(defaultResource)
.build();
testGetAllocatedResources(testData);
}
@Test
public void testGetAllocatedResourcesWithoutCustomResources() {
QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5)
.withResources(newResource(4 * GB, 4, Collections.<String, String>emptyMap()))
.withLeafQueue(createBasicQueueHierarchy())
.build();
testGetAllocatedResources(testData);
}
}