YARN-6492. Generate queue metrics for each partition. Contributed by Manikandan R

This commit is contained in:
Jonathan Hung 2020-06-01 10:48:41 -07:00
parent 73ae3cee85
commit b9a0f99966
16 changed files with 1713 additions and 229 deletions

View File

@ -305,6 +305,19 @@ public class Resources {
return lhs;
}
/**
* Subtract {@code rhs} from {@code lhs} and reset any negative values to
* zero. This call will operate on a copy of {@code lhs}, leaving {@code lhs}
* unmodified.
*
* @param lhs {@link Resource} to subtract from
* @param rhs {@link Resource} to subtract
* @return the value of lhs after subtraction
*/
public static Resource subtractNonNegative(Resource lhs, Resource rhs) {
return subtractFromNonNegative(clone(lhs), rhs);
}
public static Resource negate(Resource resource) {
return subtract(NONE, resource);
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@ -90,9 +91,11 @@ public class AppSchedulingInfo {
public final ContainerUpdateContext updateContext;
private final RMContext rmContext;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, AbstractUsersManager abstractUsersManager,
long epoch, ResourceUsage appResourceUsage) {
long epoch, ResourceUsage appResourceUsage, RMContext rmContext) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
@ -106,6 +109,7 @@ public class AppSchedulingInfo {
updateContext = new ContainerUpdateContext(this);
readLock = lock.readLock();
writeLock = lock.writeLock();
this.rmContext = rmContext;
}
public ApplicationId getApplicationId() {
@ -437,7 +441,7 @@ public class AppSchedulingInfo {
public List<ResourceRequest> allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey,
Container containerAllocated) {
RMContainer containerAllocated) {
try {
writeLock.lock();
@ -593,7 +597,7 @@ public class AppSchedulingInfo {
}
private void updateMetricsForAllocatedContainer(NodeType type,
SchedulerNode node, Container containerAllocated) {
SchedulerNode node, RMContainer containerAllocated) {
QueueMetrics metrics = queue.getMetrics();
if (pending) {
// once an allocation is done we assume the application is
@ -604,14 +608,16 @@ public class AppSchedulingInfo {
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: applicationId=" + applicationId + " container="
+ containerAllocated.getId() + " host=" + containerAllocated
.getNodeId().toString() + " user=" + user + " resource="
+ containerAllocated.getResource() + " type="
+ type);
+ containerAllocated.getContainer().getId() + " host="
+ containerAllocated.getContainer().getNodeId().toString() + " user="
+ user + " resource="
+ containerAllocated.getContainer().getResource() + " type=" + type);
}
if(node != null) {
if (node != null) {
metrics.allocateResources(node.getPartition(), user, 1,
containerAllocated.getResource(), true);
containerAllocated.getContainer().getResource(), false);
metrics.decrPendingResources(containerAllocated.getNodeLabelExpression(),
user, 1, containerAllocated.getContainer().getResource());
}
metrics.incrNodeTypeAggregations(user, type);
}
@ -657,4 +663,8 @@ public class AppSchedulingInfo {
this.readLock.unlock();
}
}
public RMContext getRMContext() {
return this.rmContext;
}
}

View File

@ -162,10 +162,16 @@ public class ContainerUpdateContext {
// Decrement the pending using a dummy RR with
// resource = prev update req capability
if (prevReq != null) {
Container container = Container.newInstance(UNDEFINED,
schedulerNode.getNodeID(), "host:port", prevReq.getCapability(),
schedulerKey.getPriority(), null);
appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
schedulerKey, Container.newInstance(UNDEFINED,
schedulerNode.getNodeID(), "host:port",
prevReq.getCapability(), schedulerKey.getPriority(), null));
schedulerKey,
new RMContainerImpl(container, schedulerKey,
appSchedulingInfo.getApplicationAttemptId(),
schedulerNode.getNodeID(), appSchedulingInfo.getUser(),
appSchedulingInfo.getRMContext(),
schedulingPlacementSet.getPrimaryRequestedNodePartition()));
}
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@Metrics(context = "yarn")
public class PartitionQueueMetrics extends QueueMetrics {
private String partition;
protected PartitionQueueMetrics(MetricsSystem ms, String queueName,
Queue parent, boolean enableUserMetrics, Configuration conf,
String partition) {
super(ms, queueName, parent, enableUserMetrics, conf);
this.partition = partition;
if (getParentQueue() != null) {
String newQueueName = (getParentQueue() instanceof CSQueue)
? ((CSQueue) getParentQueue()).getQueuePath()
: getParentQueue().getQueueName();
String parentMetricName =
partition + METRIC_NAME_DELIMITER + newQueueName;
setParent(getQueueMetrics().get(parentMetricName));
}
}
/**
* Partition * Queue * User Metrics
*
* Computes Metrics at Partition (Node Label) * Queue * User Level.
*
* Sample JMX O/P Structure:
*
* PartitionQueueMetrics (labelX)
* QueueMetrics (A)
* usermetrics
* QueueMetrics (A1)
* usermetrics
* QueueMetrics (A2)
* usermetrics
* QueueMetrics (B)
* usermetrics
*
* @return QueueMetrics
*/
@Override
public synchronized QueueMetrics getUserMetrics(String userName) {
if (users == null) {
return null;
}
String partitionJMXStr =
(partition.equals(DEFAULT_PARTITION)) ? DEFAULT_PARTITION_JMX_STR
: partition;
QueueMetrics metrics = (PartitionQueueMetrics) users.get(userName);
if (metrics == null) {
metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName,
null, false, this.conf, this.partition);
users.put(userName, metrics);
metricsSystem.register(
pSourceName(partitionJMXStr).append(qSourceName(queueName))
.append(",user=").append(userName).toString(),
"Metrics for user '" + userName + "' in queue '" + queueName + "'",
((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr)
.tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName));
}
return metrics;
}
}

View File

@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@ -54,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
@InterfaceAudience.Private
@ -114,15 +114,19 @@ public class QueueMetrics implements MetricsSource {
info("Queue", "Metrics by queue");
protected static final MetricsInfo USER_INFO =
info("User", "Metrics by user");
protected static final MetricsInfo PARTITION_INFO =
info("Partition", "Metrics by partition");
static final Splitter Q_SPLITTER =
Splitter.on('.').omitEmptyStrings().trimResults();
protected final MetricsRegistry registry;
protected final String queueName;
protected final QueueMetrics parent;
private QueueMetrics parent;
private final Queue parentQueue;
protected final MetricsSystem metricsSystem;
protected final Map<String, QueueMetrics> users;
protected final Configuration conf;
private final boolean enableUserMetrics;
private QueueMetricsForCustomResources queueMetricsForCustomResources;
private static final String ALLOCATED_RESOURCE_METRIC_PREFIX =
@ -150,6 +154,18 @@ public class QueueMetrics implements MetricsSource {
private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC =
"Aggregate Preempted Seconds for NAME";
protected static final MetricsInfo P_RECORD_INFO =
info("PartitionQueueMetrics", "Metrics for the resource scheduler");
// Use "default" to operate NO_LABEL (default) partition internally
public static final String DEFAULT_PARTITION = "default";
// Use "" to register NO_LABEL (default) partition into metrics system
public static final String DEFAULT_PARTITION_JMX_STR = "";
// Metric Name Delimiter
public static final String METRIC_NAME_DELIMITER = ".";
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
registry = new MetricsRegistry(RECORD_INFO);
@ -157,6 +173,8 @@ public class QueueMetrics implements MetricsSource {
this.parent = parent != null ? parent.getMetrics() : null;
this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>()
: null;
this.parentQueue = parent;
this.enableUserMetrics = enableUserMetrics;
metricsSystem = ms;
this.conf = conf;
runningTime = buildBuckets(conf);
@ -178,12 +196,25 @@ public class QueueMetrics implements MetricsSource {
return sb;
}
public synchronized
static QueueMetrics forQueue(String queueName, Queue parent,
boolean enableUserMetrics,
Configuration conf) {
static StringBuilder pSourceName(String partition) {
StringBuilder sb = new StringBuilder(P_RECORD_INFO.name());
sb.append(",partition").append('=').append(partition);
return sb;
}
static StringBuilder qSourceName(String queueName) {
StringBuilder sb = new StringBuilder();
int i = 0;
for (String node : Q_SPLITTER.split(queueName)) {
sb.append(",q").append(i++).append('=').append(node);
}
return sb;
}
public synchronized static QueueMetrics forQueue(String queueName,
Queue parent, boolean enableUserMetrics, Configuration conf) {
return forQueue(DefaultMetricsSystem.instance(), queueName, parent,
enableUserMetrics, conf);
enableUserMetrics, conf);
}
/**
@ -209,24 +240,20 @@ public class QueueMetrics implements MetricsSource {
return QUEUE_METRICS;
}
public synchronized
static QueueMetrics forQueue(MetricsSystem ms, String queueName,
Queue parent, boolean enableUserMetrics,
Configuration conf) {
QueueMetrics metrics = QUEUE_METRICS.get(queueName);
public synchronized static QueueMetrics forQueue(MetricsSystem ms,
String queueName, Queue parent, boolean enableUserMetrics,
Configuration conf) {
QueueMetrics metrics = getQueueMetrics().get(queueName);
if (metrics == null) {
metrics =
new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf).
tag(QUEUE_INFO, queueName);
metrics = new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
.tag(QUEUE_INFO, queueName);
// Register with the MetricsSystems
if (ms != null) {
metrics =
ms.register(
sourceName(queueName).toString(),
"Metrics for queue: " + queueName, metrics);
metrics = ms.register(sourceName(queueName).toString(),
"Metrics for queue: " + queueName, metrics);
}
QUEUE_METRICS.put(queueName, metrics);
getQueueMetrics().put(queueName, metrics);
}
return metrics;
@ -238,7 +265,8 @@ public class QueueMetrics implements MetricsSource {
}
QueueMetrics metrics = users.get(userName);
if (metrics == null) {
metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf);
metrics =
new QueueMetrics(metricsSystem, queueName, null, false, conf);
users.put(userName, metrics);
metricsSystem.register(
sourceName(queueName).append(",user=").append(userName).toString(),
@ -248,6 +276,96 @@ public class QueueMetrics implements MetricsSource {
return metrics;
}
/**
* Partition * Queue Metrics
*
* Computes Metrics at Partition (Node Label) * Queue Level.
*
* Sample JMX O/P Structure:
*
* PartitionQueueMetrics (labelX)
* QueueMetrics (A)
* metrics
* QueueMetrics (A1)
* metrics
* QueueMetrics (A2)
* metrics
* QueueMetrics (B)
* metrics
*
* @param partition
* @return QueueMetrics
*/
public synchronized QueueMetrics getPartitionQueueMetrics(String partition) {
String partitionJMXStr = partition;
if ((partition == null)
|| (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
partition = DEFAULT_PARTITION;
partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
}
String metricName = partition + METRIC_NAME_DELIMITER + this.queueName;
QueueMetrics metrics = getQueueMetrics().get(metricName);
if (metrics == null) {
QueueMetrics queueMetrics =
new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue,
this.enableUserMetrics, this.conf, partition);
metricsSystem.register(
pSourceName(partitionJMXStr).append(qSourceName(this.queueName))
.toString(),
"Metrics for queue: " + this.queueName,
queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO,
this.queueName));
getQueueMetrics().put(metricName, queueMetrics);
return queueMetrics;
} else {
return metrics;
}
}
/**
* Partition Metrics
*
* Computes Metrics at Partition (Node Label) Level.
*
* Sample JMX O/P Structure:
*
* PartitionQueueMetrics (labelX)
* metrics
*
* @param partition
* @return QueueMetrics
*/
private QueueMetrics getPartitionMetrics(String partition) {
String partitionJMXStr = partition;
if ((partition == null)
|| (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
partition = DEFAULT_PARTITION;
partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
}
String metricName = partition + METRIC_NAME_DELIMITER;
QueueMetrics metrics = getQueueMetrics().get(metricName);
if (metrics == null) {
metrics = new PartitionQueueMetrics(metricsSystem, this.queueName, null,
false, this.conf, partition);
// Register with the MetricsSystems
if (metricsSystem != null) {
metricsSystem.register(pSourceName(partitionJMXStr).toString(),
"Metrics for partition: " + partitionJMXStr,
(PartitionQueueMetrics) metrics.tag(PARTITION_INFO,
partitionJMXStr));
}
getQueueMetrics().put(metricName, metrics);
}
return metrics;
}
private ArrayList<Integer> parseInts(String value) {
ArrayList<Integer> result = new ArrayList<Integer>();
for(String s: value.split(",")) {
@ -388,17 +506,36 @@ public class QueueMetrics implements MetricsSource {
*/
public void setAvailableResourcesToQueue(String partition, Resource limit) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
availableMB.set(limit.getMemorySize());
availableVCores.set(limit.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.setAvailable(limit);
registerCustomResources(
queueMetricsForCustomResources.getAvailableValues(),
AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
setAvailableResources(limit);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.setAvailableResources(limit);
if (this.queueName.equals("root")) {
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.setAvailableResources(limit);
}
}
}
}
/**
* Set Available resources with support for resource vectors.
*
* @param limit
*/
public void setAvailableResources(Resource limit) {
availableMB.set(limit.getMemorySize());
availableVCores.set(limit.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.setAvailable(limit);
registerCustomResources(
queueMetricsForCustomResources.getAvailableValues(),
AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
}
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
@ -420,7 +557,15 @@ public class QueueMetrics implements MetricsSource {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.setAvailableResourcesToQueue(partition, limit);
userMetrics.setAvailableResources(limit);
}
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
QueueMetrics partitionUserMetrics =
partitionQueueMetrics.getUserMetrics(user);
if (partitionUserMetrics != null) {
partitionUserMetrics.setAvailableResources(limit);
}
}
}
@ -435,18 +580,46 @@ public class QueueMetrics implements MetricsSource {
*/
public void incrPendingResources(String partition, String user,
int containers, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
_incrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.incrPendingResources(partition, user, containers, res);
}
if (parent != null) {
parent.incrPendingResources(partition, user, containers, res);
internalIncrPendingResources(partition, user, containers, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalIncrPendingResources(partition, user,
containers, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.incrementPendingResources(containers, res);
}
}
}
public void internalIncrPendingResources(String partition, String user,
int containers, Resource res) {
incrementPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalIncrPendingResources(partition, user, containers,
res);
}
if (parent != null) {
parent.internalIncrPendingResources(partition, user, containers, res);
}
}
private void incrementPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increasePending(res, containers);
registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
}
protected Map<String, Long> initAndGetCustomResources() {
Map<String, Long> customResources = new HashMap<String, Long>();
ResourceInformation[] resources = ResourceUtils.getResourceTypesArray();
@ -505,32 +678,38 @@ public class QueueMetrics implements MetricsSource {
}
}
private void _incrPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increasePending(res, containers);
registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
}
public void decrPendingResources(String partition, String user,
int containers, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
_decrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.decrPendingResources(partition, user, containers, res);
}
if (parent != null) {
parent.decrPendingResources(partition, user, containers, res);
internalDecrPendingResources(partition, user, containers, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalDecrPendingResources(partition, user,
containers, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.decrementPendingResources(containers, res);
}
}
}
private void _decrPendingResources(int containers, Resource res) {
protected void internalDecrPendingResources(String partition, String user,
int containers, Resource res) {
decrementPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalDecrPendingResources(partition, user, containers,
res);
}
if (parent != null) {
parent.internalDecrPendingResources(partition, user, containers, res);
}
}
private void decrementPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCores() * containers);
@ -560,32 +739,59 @@ public class QueueMetrics implements MetricsSource {
}
}
public void allocateResources(String partition, String user,
int containers, Resource res, boolean decrPending) {
public void allocateResources(String partition, String user, int containers,
Resource res, boolean decrPending) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
internalAllocateResources(partition, user, containers, res, decrPending);
}
allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res, containers);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalAllocateResources(partition, user,
containers, res, decrPending);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.computeAllocateResources(containers, res, decrPending);
}
}
}
if (decrPending) {
_decrPendingResources(containers, res);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.allocateResources(partition, user,
containers, res, decrPending);
}
if (parent != null) {
parent.allocateResources(partition, user, containers, res, decrPending);
}
public void internalAllocateResources(String partition, String user,
int containers, Resource res, boolean decrPending) {
computeAllocateResources(containers, res, decrPending);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalAllocateResources(partition, user, containers, res,
decrPending);
}
if (parent != null) {
parent.internalAllocateResources(partition, user, containers, res,
decrPending);
}
}
/**
* Allocate Resources for a partition with support for resource vectors.
*
* @param containers number of containers
* @param res resource containing memory size, vcores etc
* @param decrPending decides whether to decrease pending resource or not
*/
private void computeAllocateResources(int containers, Resource res,
boolean decrPending) {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res, containers);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
if (decrPending) {
decrementPendingResources(containers, res);
}
}
@ -596,81 +802,80 @@ public class QueueMetrics implements MetricsSource {
* @param res
*/
public void allocateResources(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedMB.incr(res.getMemorySize());
allocatedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.increaseAllocated(res);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
pendingMB.decr(res.getMemorySize());
pendingVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res);
registerCustomResources(
queueMetricsForCustomResources.getPendingValues(),
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.allocateResources(partition, user, res);
}
if (parent != null) {
parent.allocateResources(partition, user, res);
}
}
}
public void releaseResources(String partition,
String user, int containers, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemorySize() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res, containers);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(partition, user, containers, res);
}
if (parent != null) {
parent.releaseResources(partition, user, containers, res);
}
}
}
/**
* Release Resource for container size change.
*
* @param user
* @param res
*/
private void releaseResources(String user, Resource res) {
allocatedMB.decr(res.getMemorySize());
allocatedVCores.decr(res.getVirtualCores());
allocatedMB.incr(res.getMemorySize());
allocatedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res);
queueMetricsForCustomResources.increaseAllocated(res);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
pendingMB.decr(res.getMemorySize());
pendingVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res);
registerCustomResources(
queueMetricsForCustomResources.getPendingValues(),
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, res);
userMetrics.allocateResources(partition, user, res);
}
if (parent != null) {
parent.releaseResources(user, res);
parent.allocateResources(partition, user, res);
}
}
public void releaseResources(String partition, String user, int containers,
Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalReleaseResources(partition, user, containers, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalReleaseResources(partition, user,
containers, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.computeReleaseResources(containers, res);
}
}
}
public void internalReleaseResources(String partition, String user,
int containers, Resource res) {
computeReleaseResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalReleaseResources(partition, user, containers, res);
}
if (parent != null) {
parent.internalReleaseResources(partition, user, containers, res);
}
}
/**
* Release Resources for a partition with support for resource vectors.
*
* @param containers number of containers
* @param res resource containing memory size, vcores etc
*/
private void computeReleaseResources(int containers, Resource res) {
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemorySize() * containers);
allocatedVCores.decr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res, containers);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
}
@ -721,11 +926,31 @@ public class QueueMetrics implements MetricsSource {
public void reserveResource(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
reserveResource(user, res);
internalReserveResources(partition, user, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalReserveResources(partition, user, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.incrReserveResources(res);
}
}
}
public void reserveResource(String user, Resource res) {
protected void internalReserveResources(String partition, String user,
Resource res) {
incrReserveResources(res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalReserveResources(partition, user, res);
}
if (parent != null) {
parent.internalReserveResources(partition, user, res);
}
}
public void incrReserveResources(Resource res) {
reservedContainers.incr();
reservedMB.incr(res.getMemorySize());
reservedVCores.incr(res.getVirtualCores());
@ -735,17 +960,37 @@ public class QueueMetrics implements MetricsSource {
queueMetricsForCustomResources.getReservedValues(),
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.reserveResource(user, res);
}
public void unreserveResource(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalUnReserveResources(partition, user, res);
}
if (parent != null) {
parent.reserveResource(user, res);
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalUnReserveResources(partition, user, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.decrReserveResource(res);
}
}
}
private void unreserveResource(String user, Resource res) {
reservedContainers.decr();
protected void internalUnReserveResources(String partition, String user,
Resource res) {
decrReserveResource(res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalUnReserveResources(partition, user, res);
}
if (parent != null) {
parent.internalUnReserveResources(partition, user, res);
}
}
public void decrReserveResource(Resource res) {
int containers = 1;
reservedContainers.decr(containers);
reservedMB.decr(res.getMemorySize());
reservedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
@ -754,19 +999,6 @@ public class QueueMetrics implements MetricsSource {
queueMetricsForCustomResources.getReservedValues(),
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.unreserveResource(user, res);
}
if (parent != null) {
parent.unreserveResource(user, res);
}
}
public void unreserveResource(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
unreserveResource(user, res);
}
}
public void incrActiveUsers() {
@ -980,4 +1212,12 @@ public class QueueMetrics implements MetricsSource {
QueueMetricsForCustomResources metrics) {
this.queueMetricsForCustomResources = metrics;
}
public void setParent(QueueMetrics parent) {
this.parent = parent;
}
public Queue getParentQueue() {
return parentQueue;
}
}

View File

@ -206,9 +206,9 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
RMContext rmContext) {
Preconditions.checkNotNull(rmContext, "RMContext should not be null");
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage);
this.appSchedulingInfo = new AppSchedulingInfo(applicationAttemptId, user,
queue, abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage,
this.rmContext);
this.queue = queue;
this.pendingRelease = Collections.newSetFromMap(
new ConcurrentHashMap<ContainerId, Boolean>());

View File

@ -229,7 +229,7 @@ public class CSQueueMetrics extends QueueMetrics {
public synchronized static CSQueueMetrics forQueue(String queueName,
Queue parent, boolean enableUserMetrics, Configuration conf) {
MetricsSystem ms = DefaultMetricsSystem.instance();
QueueMetrics metrics = QueueMetrics.getQueueMetrics().get(queueName);
QueueMetrics metrics = getQueueMetrics().get(queueName);
if (metrics == null) {
metrics =
new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
@ -241,7 +241,7 @@ public class CSQueueMetrics extends QueueMetrics {
ms.register(sourceName(queueName).toString(), "Metrics for queue: "
+ queueName, metrics);
}
QueueMetrics.getQueueMetrics().put(queueName, metrics);
getQueueMetrics().put(queueName, metrics);
}
return (CSQueueMetrics) metrics;
@ -254,7 +254,8 @@ public class CSQueueMetrics extends QueueMetrics {
}
CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName);
if (metrics == null) {
metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
metrics =
new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
users.put(userName, metrics);
metricsSystem.register(
sourceName(queueName).append(",user=").append(userName).toString(),

View File

@ -1370,8 +1370,9 @@ public class LeafQueue extends AbstractCSQueue {
: getQueueMaxResource(partition, clusterResource);
Resource headroom = Resources.componentwiseMin(
Resources.subtract(userLimitResource, user.getUsed(partition)),
Resources.subtract(currentPartitionResourceLimit,
Resources.subtractNonNegative(userLimitResource,
user.getUsed(partition)),
Resources.subtractNonNegative(currentPartitionResourceLimit,
queueUsage.getUsed(partition)));
// Normalize it before return
headroom =
@ -1685,11 +1686,16 @@ public class LeafQueue extends AbstractCSQueue {
User user = usersManager.updateUserResourceUsage(userName, resource,
nodePartition, true);
// Note this is a bit unconventional since it gets the object and modifies
// it here, rather then using set routine
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(nodePartition,
userName, application.getHeadroom());
Resource partitionHeadroom = Resources.createResource(0, 0);
if (metrics.getUserMetrics(userName) != null) {
partitionHeadroom = getHeadroom(user,
cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
getResourceLimitForActiveUsers(userName, clusterResource,
nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
nodePartition);
}
metrics.setAvailableResourcesToUser(nodePartition, userName,
partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() + " user=" + userName + " used="
@ -1728,8 +1734,16 @@ public class LeafQueue extends AbstractCSQueue {
User user = usersManager.updateUserResourceUsage(userName, resource,
nodePartition, false);
metrics.setAvailableResourcesToUser(nodePartition,
userName, application.getHeadroom());
Resource partitionHeadroom = Resources.createResource(0, 0);
if (metrics.getUserMetrics(userName) != null) {
partitionHeadroom = getHeadroom(user,
cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
getResourceLimitForActiveUsers(userName, clusterResource,
nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
nodePartition);
}
metrics.setAvailableResourcesToUser(nodePartition, userName,
partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(

View File

@ -549,8 +549,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
List<ResourceRequest> requests = appSchedulingInfo.allocate(
allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getRmContainer().getContainer());
schedulerContainer.getSchedulerRequestKey(), rmContainer);
((RMContainerImpl) rmContainer).setResourceRequests(requests);
attemptResourceUsage.incUsed(schedulerContainer.getNodePartition(),

View File

@ -469,7 +469,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
type, node, schedulerKey, rmContainer);
this.attemptResourceUsage.incUsed(container.getResource());
getQueue().incUsedResource(container.getResource());

View File

@ -81,7 +81,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
type, node, schedulerKey, rmContainer);
attemptResourceUsage.incUsed(node.getPartition(),
container.getResource());

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -47,8 +48,9 @@ public class TestAppSchedulingInfo {
FSLeafQueue queue = mock(FSLeafQueue.class);
doReturn("test").when(queue).getQueueName();
RMContext rmContext = mock(RMContext.class);
AppSchedulingInfo appSchedulingInfo = new AppSchedulingInfo(
appAttemptId, "test", queue, null, 0, new ResourceUsage());
appAttemptId, "test", queue, null, 0, new ResourceUsage(), rmContext);
appSchedulingInfo.updatePlacesBlacklistedByApp(new ArrayList<String>(),
new ArrayList<String>());
@ -118,9 +120,10 @@ public class TestAppSchedulingInfo {
Queue queue = mock(Queue.class);
doReturn(mock(QueueMetrics.class)).when(queue).getMetrics();
RMContext rmContext = mock(RMContext.class);
AppSchedulingInfo info = new AppSchedulingInfo(
appAttemptId, "test", queue, mock(ActiveUsersManager.class), 0,
new ResourceUsage());
new ResourceUsage(), rmContext);
Assert.assertEquals(0, info.getSchedulerKeys().size());
Priority pri1 = Priority.newInstance(1);

View File

@ -0,0 +1,752 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestPartitionQueueMetrics {
static final int GB = 1024; // MB
private static final Configuration CONF = new Configuration();
private MetricsSystem ms;
@Before
public void setUp() {
ms = new MetricsSystemImpl();
QueueMetrics.clearQueueMetrics();
PartitionQueueMetrics.clearQueueMetrics();
}
@After
public void tearDown() {
ms.shutdown();
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in only 1 partition, x.
*
* root
* / \
* q1 q2
*
* @throws Exception
*/
@Test
public void testSinglePartitionWithSingleLevelQueueMetrics()
throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
String user = "alice";
QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, CONF);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
QueueMetrics q2 =
QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
q1.submitApp(user);
q1.submitAppAttempt(user);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("x",
Resources.createResource(100 * GB, 100));
q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(ms, "x");
MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
MetricsSource q2Source = queueSource(ms, "x", "root.q2");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in both partitions, x & y.
*
* root
* / \
* q1 q2
*
* @throws Exception
*/
@Test
public void testTwoPartitionWithSingleLevelQueueMetrics() throws Exception {
String parentQueueName = "root";
String user = "alice";
QueueMetrics root =
QueueMetrics.forQueue(ms, parentQueueName, null, false, CONF);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, false, CONF);
QueueMetrics q2 =
QueueMetrics.forQueue(ms, "root.q2", parentQueue, false, CONF);
AppSchedulingInfo app = mockApp(user);
q1.submitApp(user);
q1.submitAppAttempt(user);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("x",
Resources.createResource(100 * GB, 100));
q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
MetricsSource xPartitionSource = partitionSource(ms, "x");
MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
checkResources(xPartitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(xRootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
root.setAvailableResourcesToQueue("y",
Resources.createResource(400 * GB, 400));
q2.setAvailableResourcesToQueue("y",
Resources.createResource(200 * GB, 200));
q2.incrPendingResources("y", user, 3, Resource.newInstance(1024, 1));
MetricsSource yPartitionSource = partitionSource(ms, "y");
MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
MetricsSource q2Source = queueSource(ms, "y", "root.q2");
checkResources(yPartitionSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
checkResources(yRootQueueSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
checkResources(q2Source, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
}
/**
* Structure:
* Both queues, q1 has been configured to run in multiple partitions, x & y.
*
* root
* /
* q1
*
* @throws Exception
*/
@Test
public void testMultiplePartitionWithSingleQueueMetrics() throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
QueueMetrics root =
QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
root.setAvailableResourcesToQueue("y",
Resources.createResource(300 * GB, 300));
q1.incrPendingResources("x", "test_user", 2, Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(ms, "x");
MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
checkResources(userSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
q1.incrPendingResources("x", "test_user", 3, Resource.newInstance(1024, 1));
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(q1Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
checkResources(userSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
q1.incrPendingResources("x", "test_user1", 4,
Resource.newInstance(1024, 1));
MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
checkResources(q1Source, 0, 0, 0, 0, 0, 9 * GB, 9, 9);
checkResources(userSource1, 0, 0, 0, 0, 0, 4 * GB, 4, 4);
q1.incrPendingResources("y", "test_user1", 6,
Resource.newInstance(1024, 1));
MetricsSource partitionSourceY = partitionSource(ms, "y");
MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName);
MetricsSource q1SourceY = queueSource(ms, "y", "root.q1");
MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1");
checkResources(partitionSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
checkResources(rootQueueSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
checkResources(q1SourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
checkResources(userSourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in both partitions, x & y.
*
* root
* / \
* q1 q2
* q1
* / \
* q11 q12
* q2
* / \
* q21 q22
*
* @throws Exception
*/
@Test
public void testMultiplePartitionsWithMultiLevelQueuesMetrics()
throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
QueueMetrics root =
QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
when(parentQueue.getMetrics()).thenReturn(root);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
Queue childQueue1 = mock(Queue.class);
when(childQueue1.getQueueName()).thenReturn("root.q1");
when(childQueue1.getMetrics()).thenReturn(q1);
QueueMetrics q11 =
QueueMetrics.forQueue(ms, "root.q1.q11", childQueue1, true, CONF);
QueueMetrics q12 =
QueueMetrics.forQueue(ms, "root.q1.q12", childQueue1, true, CONF);
QueueMetrics q2 =
QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
Queue childQueue2 = mock(Queue.class);
when(childQueue2.getQueueName()).thenReturn("root.q2");
when(childQueue2.getMetrics()).thenReturn(q2);
QueueMetrics q21 =
QueueMetrics.forQueue(ms, "root.q2.q21", childQueue2, true, CONF);
QueueMetrics q22 =
QueueMetrics.forQueue(ms, "root.q2.q22", childQueue2, true, CONF);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("x",
Resources.createResource(100 * GB, 100));
q11.setAvailableResourcesToQueue("x",
Resources.createResource(50 * GB, 50));
q11.incrPendingResources("x", "test_user", 2,
Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(ms, "x");
MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
checkResources(userSource, 0, 0, 0, 0 * GB, 0, 2 * GB, 2, 2);
q11.incrPendingResources("x", "test_user", 4,
Resource.newInstance(1024, 1));
MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 6 * GB, 6, 6);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 6 * GB, 6, 6);
checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
q11.incrPendingResources("x", "test_user1", 5,
Resource.newInstance(1024, 1));
MetricsSource q1UserSource1 = userSource(ms, "x", "test_user1", "root.q1");
MetricsSource userSource1 =
userSource(ms, "x", "test_user1", "root.q1.q11");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 11 * GB, 11, 11);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 11 * GB, 11, 11);
checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
checkResources(q1UserSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
checkResources(userSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
q12.incrPendingResources("x", "test_user", 5,
Resource.newInstance(1024, 1));
MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 16 * GB, 16, 16);
checkResources(q12Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
root.setAvailableResourcesToQueue("y",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("y",
Resources.createResource(100 * GB, 100));
q12.setAvailableResourcesToQueue("y",
Resources.createResource(50 * GB, 50));
q12.incrPendingResources("y", "test_user", 3,
Resource.newInstance(1024, 1));
MetricsSource yPartitionSource = partitionSource(ms, "y");
MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
MetricsSource q1YSource = queueSource(ms, "y", "root.q1");
MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12");
checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
checkResources(q1YSource, 0, 0, 0, 100 * GB, 100, 3 * GB, 3, 3);
checkResources(q12YSource, 0, 0, 0, 50 * GB, 50, 3 * GB, 3, 3);
root.setAvailableResourcesToQueue("y",
Resources.createResource(200 * GB, 200));
q2.setAvailableResourcesToQueue("y",
Resources.createResource(100 * GB, 100));
q21.setAvailableResourcesToQueue("y",
Resources.createResource(50 * GB, 50));
q21.incrPendingResources("y", "test_user", 5,
Resource.newInstance(1024, 1));
MetricsSource q21Source = queueSource(ms, "y", "root.q2.q21");
MetricsSource q2YSource = queueSource(ms, "y", "root.q2");
checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
checkResources(q2YSource, 0, 0, 0, 100 * GB, 100, 5 * GB, 5, 5);
checkResources(q21Source, 0, 0, 0, 50 * GB, 50, 5 * GB, 5, 5);
q22.incrPendingResources("y", "test_user", 6,
Resource.newInstance(1024, 1));
MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22");
checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
checkResources(q22Source, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
}
@Test
public void testTwoLevelWithUserMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
String user = "alice";
String partition = "x";
QueueMetrics parentMetrics =
QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
when(parentQueue.getMetrics()).thenReturn(parentMetrics);
QueueMetrics metrics =
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, CONF);
AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user);
metrics.submitAppAttempt(user);
parentMetrics.setAvailableResourcesToQueue(partition,
Resources.createResource(100 * GB, 100));
metrics.setAvailableResourcesToQueue(partition,
Resources.createResource(100 * GB, 100));
parentMetrics.setAvailableResourcesToUser(partition, user,
Resources.createResource(10 * GB, 10));
metrics.setAvailableResourcesToUser(partition, user,
Resources.createResource(10 * GB, 10));
metrics.incrPendingResources(partition, user, 6,
Resources.createResource(3 * GB, 3));
MetricsSource partitionSource = partitionSource(ms, partition);
MetricsSource parentQueueSource =
queueSource(ms, partition, parentQueueName);
MetricsSource queueSource = queueSource(ms, partition, leafQueueName);
MetricsSource userSource = userSource(ms, partition, user, leafQueueName);
MetricsSource userSource1 =
userSource(ms, partition, user, parentQueueName);
checkResources(queueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6, 0,
0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
6, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, 0,
0);
checkResources(userSource1, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
0, 0);
checkResources(partitionSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
6, 0, 0, 0);
metrics.runAppAttempt(app.getApplicationId(), user);
metrics.allocateResources(partition, user, 3,
Resources.createResource(1 * GB, 1), true);
metrics.reserveResource(partition, user,
Resources.createResource(3 * GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
checkResources(queueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB, 15,
3, 3 * GB, 3, 1);
checkResources(parentQueueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100,
15 * GB, 15, 3, 3 * GB, 3, 1);
checkResources(partitionSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB,
15, 3, 3 * GB, 3, 1);
checkResources(userSource, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
3 * GB, 3, 1);
checkResources(userSource1, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
3 * GB, 3, 1);
metrics.allocateResources(partition, user, 3,
Resources.createResource(1 * GB, 1), true);
checkResources(queueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100, 12 * GB, 12,
0, 3 * GB, 3, 1);
checkResources(parentQueueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100,
12 * GB, 12, 0, 3 * GB, 3, 1);
metrics.releaseResources(partition, user, 1,
Resources.createResource(2 * GB, 2));
metrics.unreserveResource(partition, user,
Resources.createResource(3 * GB, 3));
checkResources(queueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB, 12,
0, 0, 0, 0);
checkResources(parentQueueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100,
12 * GB, 12, 0, 0, 0, 0);
checkResources(partitionSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB,
12, 0, 0, 0, 0);
checkResources(userSource, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
0, 0, 0);
checkResources(userSource1, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
0, 0, 0);
metrics.finishAppAttempt(app.getApplicationId(), app.isPending(),
app.getUser());
metrics.finishApp(user, RMAppState.FINISHED);
}
@Test
public void testThreeLevelWithUserMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
String leafQueueName1 = "root.leaf.leaf1";
String user = "alice";
String partitionX = "x";
String partitionY = "y";
QueueMetrics parentMetrics =
QueueMetrics.forQueue(parentQueueName, null, true, CONF);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
when(parentQueue.getMetrics()).thenReturn(parentMetrics);
QueueMetrics metrics =
QueueMetrics.forQueue(leafQueueName, parentQueue, true, CONF);
Queue leafQueue = mock(Queue.class);
when(leafQueue.getQueueName()).thenReturn(leafQueueName);
when(leafQueue.getMetrics()).thenReturn(metrics);
QueueMetrics metrics1 =
QueueMetrics.forQueue(leafQueueName1, leafQueue, true, CONF);
AppSchedulingInfo app = mockApp(user);
metrics1.submitApp(user);
metrics1.submitAppAttempt(user);
parentMetrics.setAvailableResourcesToQueue(partitionX,
Resources.createResource(200 * GB, 200));
parentMetrics.setAvailableResourcesToQueue(partitionY,
Resources.createResource(500 * GB, 500));
metrics.setAvailableResourcesToQueue(partitionX,
Resources.createResource(100 * GB, 100));
metrics.setAvailableResourcesToQueue(partitionY,
Resources.createResource(400 * GB, 400));
metrics1.setAvailableResourcesToQueue(partitionX,
Resources.createResource(50 * GB, 50));
metrics1.setAvailableResourcesToQueue(partitionY,
Resources.createResource(300 * GB, 300));
parentMetrics.setAvailableResourcesToUser(partitionX, user,
Resources.createResource(20 * GB, 20));
parentMetrics.setAvailableResourcesToUser(partitionY, user,
Resources.createResource(50 * GB, 50));
metrics.setAvailableResourcesToUser(partitionX, user,
Resources.createResource(10 * GB, 10));
metrics.setAvailableResourcesToUser(partitionY, user,
Resources.createResource(40 * GB, 40));
metrics1.setAvailableResourcesToUser(partitionX, user,
Resources.createResource(5 * GB, 5));
metrics1.setAvailableResourcesToUser(partitionY, user,
Resources.createResource(30 * GB, 30));
metrics1.incrPendingResources(partitionX, user, 6,
Resources.createResource(3 * GB, 3));
metrics1.incrPendingResources(partitionY, user, 6,
Resources.createResource(4 * GB, 4));
MetricsSource partitionSourceX =
partitionSource(metrics1.getMetricsSystem(), partitionX);
MetricsSource parentQueueSourceWithPartX =
queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName);
MetricsSource queueSourceWithPartX =
queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName);
MetricsSource queueSource1WithPartX =
queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1);
MetricsSource parentUserSourceWithPartX = userSource(metrics1.getMetricsSystem(),
partitionX, user, parentQueueName);
MetricsSource userSourceWithPartX = userSource(metrics1.getMetricsSystem(),
partitionX, user, leafQueueName);
MetricsSource userSource1WithPartX = userSource(metrics1.getMetricsSystem(),
partitionX, user, leafQueueName1);
checkResources(partitionSourceX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB, 18,
6, 0, 0, 0);
checkResources(parentQueueSourceWithPartX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB,
18, 6, 0, 0, 0);
checkResources(queueSourceWithPartX, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6,
0, 0, 0);
checkResources(queueSource1WithPartX, 0, 0, 0, 0, 0, 50 * GB, 50, 18 * GB, 18, 6,
0, 0, 0);
checkResources(parentUserSourceWithPartX, 0, 0, 0, 0, 0, 20 * GB, 20, 18 * GB, 18,
6, 0, 0, 0);
checkResources(userSourceWithPartX, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
0, 0);
checkResources(userSource1WithPartX, 0, 0, 0, 0, 0, 5 * GB, 5, 18 * GB, 18, 6, 0,
0, 0);
MetricsSource partitionSourceY =
partitionSource(metrics1.getMetricsSystem(), partitionY);
MetricsSource parentQueueSourceWithPartY =
queueSource(metrics1.getMetricsSystem(), partitionY, parentQueueName);
MetricsSource queueSourceWithPartY =
queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName);
MetricsSource queueSource1WithPartY =
queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName1);
MetricsSource parentUserSourceWithPartY = userSource(metrics1.getMetricsSystem(),
partitionY, user, parentQueueName);
MetricsSource userSourceWithPartY = userSource(metrics1.getMetricsSystem(),
partitionY, user, leafQueueName);
MetricsSource userSource1WithPartY = userSource(metrics1.getMetricsSystem(),
partitionY, user, leafQueueName1);
checkResources(partitionSourceY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB, 24,
6, 0, 0, 0);
checkResources(parentQueueSourceWithPartY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB,
24, 6, 0, 0, 0);
checkResources(queueSourceWithPartY, 0, 0, 0, 0, 0, 400 * GB, 400, 24 * GB, 24, 6,
0, 0, 0);
checkResources(queueSource1WithPartY, 0, 0, 0, 0, 0, 300 * GB, 300, 24 * GB, 24, 6,
0, 0, 0);
checkResources(parentUserSourceWithPartY, 0, 0, 0, 0, 0, 50 * GB, 50, 24 * GB, 24,
6, 0, 0, 0);
checkResources(userSourceWithPartY, 0, 0, 0, 0, 0, 40 * GB, 40, 24 * GB, 24, 6, 0,
0, 0);
checkResources(userSource1WithPartY, 0, 0, 0, 0, 0, 30 * GB, 30, 24 * GB, 24, 6, 0,
0, 0);
metrics1.finishAppAttempt(app.getApplicationId(), app.isPending(),
app.getUser());
metrics1.finishApp(user, RMAppState.FINISHED);
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in only 1 partition, x
* UserMetrics has been disabled, hence trying to access the user source
* throws NPE from sources.
*
* root
* / \
* q1 q2
*
* @throws Exception
*/
@Test(expected = NullPointerException.class)
public void testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics()
throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
String user = "alice";
QueueMetrics root = QueueMetrics.forQueue("root", null, false, CONF);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
CSQueueMetrics q1 =
CSQueueMetrics.forQueue("root.q1", parentQueue, false, CONF);
CSQueueMetrics q2 =
CSQueueMetrics.forQueue("root.q2", parentQueue, false, CONF);
AppSchedulingInfo app = mockApp(user);
q1.submitApp(user);
q1.submitAppAttempt(user);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), "x");
MetricsSource rootQueueSource =
queueSource(q1.getMetricsSystem(), "x", parentQueueName);
MetricsSource q1Source = queueSource(q1.getMetricsSystem(), "x", "root.q1");
MetricsSource q1UserSource =
userSource(q1.getMetricsSystem(), "x", user, "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
checkResources(q1UserSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
MetricsSource q2Source = queueSource(q2.getMetricsSystem(), "x", "root.q2");
MetricsSource q2UserSource =
userSource(q1.getMetricsSystem(), "x", user, "root.q2");
checkResources(partitionSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
checkResources(q2UserSource, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
q1.finishAppAttempt(app.getApplicationId(), app.isPending(), app.getUser());
q1.finishApp(user, RMAppState.FINISHED);
}
public static MetricsSource partitionSource(MetricsSystem ms,
String partition) {
MetricsSource s =
ms.getSource(QueueMetrics.pSourceName(partition).toString());
return s;
}
public static MetricsSource queueSource(MetricsSystem ms, String partition,
String queue) {
MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
.append(QueueMetrics.qSourceName(queue)).toString());
return s;
}
public static MetricsSource userSource(MetricsSystem ms, String partition,
String user, String queue) {
MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
.append(QueueMetrics.qSourceName(queue)).append(",user=")
.append(user).toString());
return s;
}
public static void checkResources(MetricsSource source, long allocatedMB,
int allocatedCores, int allocCtnrs, long availableMB, int availableCores,
long pendingMB, int pendingCores, int pendingCtnrs) {
MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb);
assertGauge("AllocatedVCores", allocatedCores, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb);
assertGauge("AvailableMB", availableMB, rb);
assertGauge("AvailableVCores", availableCores, rb);
assertGauge("PendingMB", pendingMB, rb);
assertGauge("PendingVCores", pendingCores, rb);
assertGauge("PendingContainers", pendingCtnrs, rb);
}
private static AppSchedulingInfo mockApp(String user) {
AppSchedulingInfo app = mock(AppSchedulingInfo.class);
when(app.getUser()).thenReturn(user);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1);
when(app.getApplicationAttemptId()).thenReturn(id);
return app;
}
public static void checkResources(MetricsSource source, long allocatedMB,
int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
long aggreReleasedCtnrs, long availableMB, int availableCores,
long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB,
int reservedCores, int reservedCtnrs) {
MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb);
assertGauge("AllocatedVCores", allocatedCores, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb);
assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
assertGauge("AvailableMB", availableMB, rb);
assertGauge("AvailableVCores", availableCores, rb);
assertGauge("PendingMB", pendingMB, rb);
assertGauge("PendingVCores", pendingCores, rb);
assertGauge("PendingContainers", pendingCtnrs, rb);
assertGauge("ReservedMB", reservedMB, rb);
assertGauge("ReservedVCores", reservedCores, rb);
assertGauge("ReservedContainers", reservedCtnrs, rb);
}
}

View File

@ -93,7 +93,7 @@ public class TestSchedulerApplicationAttempt {
app.liveContainers.put(container1.getContainerId(), container1);
SchedulerNode node = createNode();
app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
toSchedulerKey(requestedPriority), container1.getContainer());
toSchedulerKey(requestedPriority), container1);
// Active user count has to decrease from queue2 due to app has NO pending requests
assertEquals(0, queue2.getAbstractUsersManager().getNumActiveUsers());
@ -135,7 +135,7 @@ public class TestSchedulerApplicationAttempt {
app.liveContainers.put(container1.getContainerId(), container1);
SchedulerNode node = createNode();
app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
toSchedulerKey(requestedPriority), container1.getContainer());
toSchedulerKey(requestedPriority), container1);
// Reserved container
Priority prio1 = Priority.newInstance(1);

View File

@ -1204,9 +1204,9 @@ public class TestLeafQueue {
qb.finishApplication(app_0.getApplicationId(), user_0);
qb.finishApplication(app_2.getApplicationId(), user_1);
qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1),
null, null);
"", null);
qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1),
null, null);
"", null);
qb.setUserLimit(50);
qb.setUserLimitFactor(1);

View File

@ -25,9 +25,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -47,11 +49,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestPartitionQueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -2032,10 +2037,9 @@ public class TestNodeLabelContainerAllocation {
assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
// The total memory tracked by QueueMetrics is 0GB for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() +
rootQueue.getMetrics().getAllocatedMB());
assertEquals(0 * GB, rootQueue.getMetrics().getAvailableMB()
+ rootQueue.getMetrics().getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a");
@ -2084,6 +2088,8 @@ public class TestNodeLabelContainerAllocation {
csConf.setCapacityByLabel(queueB, "x", 50);
csConf.setMaximumCapacityByLabel(queueB, "x", 50);
csConf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
// set node -> label
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", false)));
@ -2102,6 +2108,54 @@ public class TestNodeLabelContainerAllocation {
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <no_label>
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
double delta = 0.0001;
CSQueue leafQueue = cs.getQueue("a");
CSQueue leafQueueB = cs.getQueue("b");
CSQueue rootQueue = cs.getRootQueue();
assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta);
assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta);
MetricsSystem ms = leafQueueB.getMetrics().getMetricsSystem();
QueueMetrics partXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x");
QueueMetrics partDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "");
QueueMetrics queueAMetrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a");
QueueMetrics queueBMetrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root.b");
QueueMetrics queueAPartDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a");
QueueMetrics queueAPartXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a");
QueueMetrics queueBPartDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.b");
QueueMetrics queueBPartXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.b");
QueueMetrics rootMetrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root");
assertEquals(10 * GB, partXMetrics.getAvailableMB(), delta);
assertEquals(10 * GB, partDefaultMetrics.getAvailableMB(), delta);
assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta);
assertEquals(10 * GB, rootMetrics.getAvailableMB(), delta);
assertEquals(2.5 * GB, queueAMetrics.getAvailableMB(), delta);
assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta);
// app1 -> a
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
@ -2109,47 +2163,73 @@ public class TestNodeLabelContainerAllocation {
// app1 asks for 3 partition= containers
am1.allocate("*", 1 * GB, 3, new ArrayList<ContainerId>());
// NM1 do 50 heartbeats
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// app1 gets all resource in partition=x (non-exclusive)
Assert.assertEquals(3, schedulerNode1.getNumContainers());
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(7 * GB,
reportNm1.getAvailableResource().getMemorySize());
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
.getNodeReport(nm2.getNodeId());
Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(9 * GB,
reportNm2.getAvailableResource().getMemorySize());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
// 3GB is used from label x quota. 1.5 GB is remaining from default label.
// 2GB is remaining from label x.
assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB());
assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta);
assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta);
assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta);
assertEquals(1 * GB, queueAMetrics.getAllocatedMB(), delta);
assertEquals(1.5 * GB, queueAMetrics.getAvailableMB(), delta);
assertEquals(0 * GB, queueBMetrics.getAllocatedMB(), delta);
assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta);
assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueBPartDefaultMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueBPartXMetrics.getPendingMB(), delta);
assertEquals(1.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta);
assertEquals(1 * GB, partDefaultMetrics.getAllocatedMB(), delta);
QueueMetrics partDefaultQueueAUserMetrics =
(QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "", "user",
"root.a");
QueueMetrics partXQueueAUserMetrics =
(QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "x", "user",
"root.a");
QueueMetrics queueAUserMetrics =
(QueueMetrics) TestQueueMetrics.userSource(ms, "root.a", "user");
assertEquals(2 * GB, queueAUserMetrics.getAvailableMB(), delta);
assertEquals(1 * GB, queueAUserMetrics.getAllocatedMB(), delta);
assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta);
assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
// app1 asks for 1 default partition container
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
// NM2 do couple of heartbeats
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// app1 gets all resource in default partition
Assert.assertEquals(2, schedulerNode2.getNumContainers());
Assert.assertEquals(3, schedulerNode1.getNumContainers());
// 3GB is used from label x quota. 2GB used from default label.
// So 0.5 GB is remaining from default label.
@ -2158,10 +2238,100 @@ public class TestNodeLabelContainerAllocation {
// The total memory tracked by QueueMetrics is 10GB
// for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() +
rootQueue.getMetrics().getAllocatedMB());
assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAMetrics.getAllocatedMB());
assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(),
delta);
assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta);
assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta);
assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta);
// Pending Resources when containers are waiting on "default" partition
assertEquals(4 * GB, queueAMetrics.getPendingMB(), delta);
assertEquals(4 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
assertEquals(4 * GB, partDefaultQueueAUserMetrics.getPendingMB(),
delta);
assertEquals(4 * GB, queueAUserMetrics.getPendingMB(), delta);
assertEquals(4 * GB, partDefaultMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partXMetrics.getPendingMB(), delta);
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAMetrics.getAllocatedMB());
assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
assertEquals(0 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(7 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(),
delta);
assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
assertEquals(0 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
assertEquals(7 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta);
assertEquals(3 * GB, partXMetrics.getAvailableMB(), delta);
assertEquals(7 * GB, partXMetrics.getAllocatedMB(), delta);
assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta);
// Pending Resources after containers has been assigned on "x" partition
assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partDefaultQueueAUserMetrics.getPendingMB(),
delta);
assertEquals(0 * GB, queueAUserMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partDefaultMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partXMetrics.getPendingMB(), delta);
rm1.killApp(app1.getApplicationId());
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta);
assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta);
assertEquals(2, queueAMetrics.getAggregateAllocatedContainers());
assertEquals(2, queueAMetrics.getAggegatedReleasedContainers());
assertEquals(2, queueAPartDefaultMetrics.getAggregateAllocatedContainers());
assertEquals(2, queueAPartDefaultMetrics.getAggegatedReleasedContainers());
assertEquals(7, partXMetrics.getAggregateAllocatedContainers());
assertEquals(2, partDefaultMetrics.getAggregateAllocatedContainers());
assertEquals(7, queueAPartXMetrics.getAggregateAllocatedContainers());
assertEquals(7, queueAPartXMetrics.getAggegatedReleasedContainers());
assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, queueAUserMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta);
assertEquals(5 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
rm1.close();
}
@ -2278,8 +2448,8 @@ public class TestNodeLabelContainerAllocation {
// The total memory tracked by QueueMetrics is 12GB
// for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() +
rootQueue.getMetrics().getAllocatedMB());
assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB()
+ rootQueue.getMetrics().getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a");
@ -2291,6 +2461,193 @@ public class TestNodeLabelContainerAllocation {
rm1.close();
}
@Test
public void testTwoLevelQueueMetricsWithLabels() throws Exception {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a"});
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(queueA, 100);
csConf.setAccessibleNodeLabels(queueA, toSet("x"));
csConf.setCapacityByLabel(queueA, "x", 100);
csConf.setMaximumCapacityByLabel(queueA, "x", 100);
csConf.setQueues(queueA, new String[] {"a1"});
final String queueA1 = queueA + ".a1";
csConf.setCapacity(queueA1, 100);
csConf.setAccessibleNodeLabels(queueA1, toSet("x"));
csConf.setCapacityByLabel(queueA1, "x", 100);
csConf.setMaximumCapacityByLabel(queueA1, "x", 100);
// set node -> label
// label x exclusivity is set to true
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", true)));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = <no_label>
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
ParentQueue leafQueueA = (ParentQueue) cs.getQueue("a");
LeafQueue leafQueueA1 = (LeafQueue) cs.getQueue("a1");
assertEquals(12 * GB, leafQueueA1.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueA1.getMetrics().getAllocatedMB());
MetricsSystem ms = leafQueueA1.getMetrics().getMetricsSystem();
QueueMetrics partXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x");
QueueMetrics partDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "");
QueueMetrics queueAPartDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a");
QueueMetrics queueAPartXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a");
QueueMetrics queueA1PartDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a.a1");
QueueMetrics queueA1PartXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a.a1");
QueueMetrics queueRootPartDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root");
QueueMetrics queueRootPartXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root");
QueueMetrics queueAMetrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a");
QueueMetrics queueA1Metrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a.a1");
QueueMetrics queueRootMetrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root");
assertEquals(12 * GB, queueAMetrics.getAvailableMB());
assertEquals(12 * GB, queueA1Metrics.getAvailableMB());
assertEquals(12 * GB, queueRootMetrics.getAvailableMB());
assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
assertEquals(10 * GB, queueA1PartXMetrics.getAvailableMB());
assertEquals(10 * GB, queueAPartXMetrics.getAvailableMB());
assertEquals(10 * GB, queueRootPartXMetrics.getAvailableMB());
assertEquals(12 * GB, queueA1PartDefaultMetrics.getAvailableMB());
assertEquals(12 * GB, queueAPartDefaultMetrics.getAvailableMB());
assertEquals(12 * GB, queueRootPartDefaultMetrics.getAvailableMB());
assertEquals(10 * GB, partXMetrics.getAvailableMB());
assertEquals(12 * GB, partDefaultMetrics.getAvailableMB());
// app1 -> a
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// app1 asks for 5 partition=x containers
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// app1 gets all resource in partition=x
Assert.assertEquals(6, schedulerNode1.getNumContainers());
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(4 * GB, reportNm1.getAvailableResource().getMemorySize());
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
.getNodeReport(nm2.getNodeId());
Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(12 * GB,
reportNm2.getAvailableResource().getMemorySize());
assertEquals(0 * GB, queueAMetrics.getAllocatedMB());
assertEquals(0 * GB, queueA1Metrics.getAllocatedMB());
assertEquals(0 * GB, queueRootMetrics.getAllocatedMB());
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB());
assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB());
assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB());
assertEquals(0 * GB, queueA1PartDefaultMetrics.getAllocatedMB());
assertEquals(0 * GB, queueAPartDefaultMetrics.getAllocatedMB());
assertEquals(0 * GB, queueRootPartDefaultMetrics.getAllocatedMB());
assertEquals(6 * GB, partXMetrics.getAllocatedMB());
assertEquals(0 * GB, partDefaultMetrics.getAllocatedMB());
// app2 -> a
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// app2 asks for 5 partition= containers
am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "");
// NM2 do 50 heartbeats
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// app1 gets all resource in partition=x
Assert.assertEquals(6, schedulerNode2.getNumContainers());
reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(4 * GB,
reportNm1.getAvailableResource().getMemorySize());
reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId());
Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(6 * GB,
reportNm2.getAvailableResource().getMemorySize());
assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB());
assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB());
// The total memory tracked by QueueMetrics is 12GB
// for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB()
+ rootQueue.getMetrics().getAllocatedMB());
assertEquals(6 * GB, queueAMetrics.getAllocatedMB());
assertEquals(6 * GB, queueA1Metrics.getAllocatedMB());
assertEquals(6 * GB, queueRootMetrics.getAllocatedMB());
assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB());
assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB());
assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB());
assertEquals(6 * GB, queueA1PartDefaultMetrics.getAllocatedMB());
assertEquals(6 * GB, queueAPartDefaultMetrics.getAllocatedMB());
assertEquals(6 * GB, queueRootPartDefaultMetrics.getAllocatedMB());
assertEquals(6 * GB, partXMetrics.getAllocatedMB());
assertEquals(6 * GB, partDefaultMetrics.getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a1");
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
rm1.close();
}
@Test
public void testQueueMetricsWithLabelsDisableElasticity() throws Exception {
/**