YARN-6492. Generate queue metrics for each partition. Contributed by Manikandan R

(cherry picked from commit c30c23cb66)
This commit is contained in:
Jonathan Hung 2020-05-26 13:39:08 -07:00
parent 90f57965e9
commit 7a323a45aa
15 changed files with 1750 additions and 243 deletions

View File

@ -39,7 +39,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@ -573,7 +572,7 @@ public class AppSchedulingInfo {
public ContainerRequest allocate(NodeType type,
SchedulerNode node, SchedulerRequestKey schedulerKey,
Container containerAllocated) {
RMContainer containerAllocated) {
writeLock.lock();
try {
if (null != containerAllocated) {
@ -731,7 +730,7 @@ public class AppSchedulingInfo {
}
private void updateMetricsForAllocatedContainer(NodeType type,
SchedulerNode node, Container containerAllocated) {
SchedulerNode node, RMContainer containerAllocated) {
QueueMetrics metrics = queue.getMetrics();
if (pending) {
// once an allocation is done we assume the application is
@ -744,15 +743,20 @@ public class AppSchedulingInfo {
}
public static void updateMetrics(ApplicationId applicationId, NodeType type,
SchedulerNode node, Container containerAllocated, String user,
SchedulerNode node, RMContainer containerAllocated, String user,
Queue queue) {
LOG.debug("allocate: applicationId={} container={} host={} user={}"
+ " resource={} type={}", applicationId, containerAllocated.getId(),
containerAllocated.getNodeId(), user, containerAllocated.getResource(),
+ " resource={} type={}", applicationId,
containerAllocated.getContainer().getId(),
containerAllocated.getNodeId(), user,
containerAllocated.getContainer().getResource(),
type);
if(node != null) {
queue.getMetrics().allocateResources(node.getPartition(), user, 1,
containerAllocated.getResource(), true);
containerAllocated.getContainer().getResource(), false);
queue.getMetrics().decrPendingResources(
containerAllocated.getNodeLabelExpression(), user, 1,
containerAllocated.getContainer().getResource());
}
queue.getMetrics().incrNodeTypeAggregations(user, type);
}
@ -831,4 +835,8 @@ public class AppSchedulingInfo {
this.readLock.unlock();
}
}
public RMContext getRMContext() {
return this.rmContext;
}
}

View File

@ -161,11 +161,17 @@ public class ContainerUpdateContext {
// Decrement the pending using a dummy RR with
// resource = prev update req capability
if (pendingAsk != null && pendingAsk.getCount() > 0) {
appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
schedulerKey, Container.newInstance(UNDEFINED,
Container container = Container.newInstance(UNDEFINED,
schedulerNode.getNodeID(), "host:port",
pendingAsk.getPerAllocationResource(),
schedulerKey.getPriority(), null));
schedulerKey.getPriority(), null);
appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode,
schedulerKey,
new RMContainerImpl(container, schedulerKey,
appSchedulingInfo.getApplicationAttemptId(),
schedulerNode.getNodeID(), appSchedulingInfo.getUser(),
appSchedulingInfo.getRMContext(),
appPlacementAllocator.getPrimaryRequestedNodePartition()));
}
}
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
@Metrics(context = "yarn")
public class PartitionQueueMetrics extends QueueMetrics {
private String partition;
protected PartitionQueueMetrics(MetricsSystem ms, String queueName,
Queue parent, boolean enableUserMetrics, Configuration conf,
String partition) {
super(ms, queueName, parent, enableUserMetrics, conf);
this.partition = partition;
if (getParentQueue() != null) {
String newQueueName = (getParentQueue() instanceof CSQueue)
? ((CSQueue) getParentQueue()).getQueuePath()
: getParentQueue().getQueueName();
String parentMetricName =
partition + METRIC_NAME_DELIMITER + newQueueName;
setParent(getQueueMetrics().get(parentMetricName));
}
}
/**
* Partition * Queue * User Metrics
*
* Computes Metrics at Partition (Node Label) * Queue * User Level.
*
* Sample JMX O/P Structure:
*
* PartitionQueueMetrics (labelX)
* QueueMetrics (A)
* usermetrics
* QueueMetrics (A1)
* usermetrics
* QueueMetrics (A2)
* usermetrics
* QueueMetrics (B)
* usermetrics
*
* @return QueueMetrics
*/
@Override
public synchronized QueueMetrics getUserMetrics(String userName) {
if (users == null) {
return null;
}
String partitionJMXStr =
(partition.equals(DEFAULT_PARTITION)) ? DEFAULT_PARTITION_JMX_STR
: partition;
QueueMetrics metrics = (PartitionQueueMetrics) users.get(userName);
if (metrics == null) {
metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName,
null, false, this.conf, this.partition);
users.put(userName, metrics);
metricsSystem.register(
pSourceName(partitionJMXStr).append(qSourceName(queueName))
.append(",user=").append(userName).toString(),
"Metrics for user '" + userName + "' in queue '" + queueName + "'",
((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr)
.tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName));
}
return metrics;
}
}

View File

@ -25,7 +25,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@ -52,6 +51,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Splitter;
@InterfaceAudience.Private
@ -112,17 +112,34 @@ public class QueueMetrics implements MetricsSource {
info("Queue", "Metrics by queue");
protected static final MetricsInfo USER_INFO =
info("User", "Metrics by user");
protected static final MetricsInfo PARTITION_INFO =
info("Partition", "Metrics by partition");
static final Splitter Q_SPLITTER =
Splitter.on('.').omitEmptyStrings().trimResults();
protected final MetricsRegistry registry;
protected final String queueName;
protected final QueueMetrics parent;
private QueueMetrics parent;
private final Queue parentQueue;
protected final MetricsSystem metricsSystem;
protected final Map<String, QueueMetrics> users;
protected final Configuration conf;
private QueueMetricsForCustomResources queueMetricsForCustomResources;
private final boolean enableUserMetrics;
protected static final MetricsInfo P_RECORD_INFO =
info("PartitionQueueMetrics", "Metrics for the resource scheduler");
// Use "default" to operate NO_LABEL (default) partition internally
public static final String DEFAULT_PARTITION = "default";
// Use "" to register NO_LABEL (default) partition into metrics system
public static final String DEFAULT_PARTITION_JMX_STR = "";
// Metric Name Delimiter
public static final String METRIC_NAME_DELIMITER = ".";
private static final String ALLOCATED_RESOURCE_METRIC_PREFIX =
"AllocatedResource.";
private static final String ALLOCATED_RESOURCE_METRIC_DESC =
@ -148,13 +165,17 @@ public class QueueMetrics implements MetricsSource {
private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC =
"Aggregate Preempted Seconds for NAME";
protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
public QueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
registry = new MetricsRegistry(RECORD_INFO);
this.queueName = queueName;
this.parent = parent != null ? parent.getMetrics() : null;
this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>()
: null;
this.parentQueue = parent;
this.users = enableUserMetrics ? new HashMap<String, QueueMetrics>() : null;
this.enableUserMetrics = enableUserMetrics;
metricsSystem = ms;
this.conf = conf;
runningTime = buildBuckets(conf);
@ -176,10 +197,23 @@ public class QueueMetrics implements MetricsSource {
return sb;
}
public synchronized
static QueueMetrics forQueue(String queueName, Queue parent,
boolean enableUserMetrics,
Configuration conf) {
static StringBuilder pSourceName(String partition) {
StringBuilder sb = new StringBuilder(P_RECORD_INFO.name());
sb.append(",partition").append('=').append(partition);
return sb;
}
static StringBuilder qSourceName(String queueName) {
StringBuilder sb = new StringBuilder();
int i = 0;
for (String node : Q_SPLITTER.split(queueName)) {
sb.append(",q").append(i++).append('=').append(node);
}
return sb;
}
public synchronized static QueueMetrics forQueue(String queueName,
Queue parent, boolean enableUserMetrics, Configuration conf) {
return forQueue(DefaultMetricsSystem.instance(), queueName, parent,
enableUserMetrics, conf);
}
@ -203,28 +237,24 @@ public class QueueMetrics implements MetricsSource {
*
* @return A string to {@link QueueMetrics} map.
*/
protected static Map<String, QueueMetrics> getQueueMetrics() {
public static Map<String, QueueMetrics> getQueueMetrics() {
return QUEUE_METRICS;
}
public synchronized
static QueueMetrics forQueue(MetricsSystem ms, String queueName,
Queue parent, boolean enableUserMetrics,
public synchronized static QueueMetrics forQueue(MetricsSystem ms,
String queueName, Queue parent, boolean enableUserMetrics,
Configuration conf) {
QueueMetrics metrics = QUEUE_METRICS.get(queueName);
QueueMetrics metrics = getQueueMetrics().get(queueName);
if (metrics == null) {
metrics =
new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf).
tag(QUEUE_INFO, queueName);
metrics = new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
.tag(QUEUE_INFO, queueName);
// Register with the MetricsSystems
if (ms != null) {
metrics =
ms.register(
sourceName(queueName).toString(),
metrics = ms.register(sourceName(queueName).toString(),
"Metrics for queue: " + queueName, metrics);
}
QUEUE_METRICS.put(queueName, metrics);
getQueueMetrics().put(queueName, metrics);
}
return metrics;
@ -236,7 +266,8 @@ public class QueueMetrics implements MetricsSource {
}
QueueMetrics metrics = users.get(userName);
if (metrics == null) {
metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf);
metrics =
new QueueMetrics(metricsSystem, queueName, null, false, conf);
users.put(userName, metrics);
metricsSystem.register(
sourceName(queueName).append(",user=").append(userName).toString(),
@ -246,6 +277,96 @@ public class QueueMetrics implements MetricsSource {
return metrics;
}
/**
* Partition * Queue Metrics
*
* Computes Metrics at Partition (Node Label) * Queue Level.
*
* Sample JMX O/P Structure:
*
* PartitionQueueMetrics (labelX)
* QueueMetrics (A)
* metrics
* QueueMetrics (A1)
* metrics
* QueueMetrics (A2)
* metrics
* QueueMetrics (B)
* metrics
*
* @param partition
* @return QueueMetrics
*/
public synchronized QueueMetrics getPartitionQueueMetrics(String partition) {
String partitionJMXStr = partition;
if ((partition == null)
|| (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
partition = DEFAULT_PARTITION;
partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
}
String metricName = partition + METRIC_NAME_DELIMITER + this.queueName;
QueueMetrics metrics = getQueueMetrics().get(metricName);
if (metrics == null) {
QueueMetrics queueMetrics =
new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue,
this.enableUserMetrics, this.conf, partition);
metricsSystem.register(
pSourceName(partitionJMXStr).append(qSourceName(this.queueName))
.toString(),
"Metrics for queue: " + this.queueName,
queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO,
this.queueName));
getQueueMetrics().put(metricName, queueMetrics);
return queueMetrics;
} else {
return metrics;
}
}
/**
* Partition Metrics
*
* Computes Metrics at Partition (Node Label) Level.
*
* Sample JMX O/P Structure:
*
* PartitionQueueMetrics (labelX)
* metrics
*
* @param partition
* @return QueueMetrics
*/
private QueueMetrics getPartitionMetrics(String partition) {
String partitionJMXStr = partition;
if ((partition == null)
|| (partition.equals(RMNodeLabelsManager.NO_LABEL))) {
partition = DEFAULT_PARTITION;
partitionJMXStr = DEFAULT_PARTITION_JMX_STR;
}
String metricName = partition + METRIC_NAME_DELIMITER;
QueueMetrics metrics = getQueueMetrics().get(metricName);
if (metrics == null) {
metrics = new PartitionQueueMetrics(metricsSystem, this.queueName, null,
false, this.conf, partition);
// Register with the MetricsSystems
if (metricsSystem != null) {
metricsSystem.register(pSourceName(partitionJMXStr).toString(),
"Metrics for partition: " + partitionJMXStr,
(PartitionQueueMetrics) metrics.tag(PARTITION_INFO,
partitionJMXStr));
}
getQueueMetrics().put(metricName, metrics);
}
return metrics;
}
private ArrayList<Integer> parseInts(String value) {
ArrayList<Integer> result = new ArrayList<Integer>();
for(String s: value.split(",")) {
@ -386,6 +507,28 @@ public class QueueMetrics implements MetricsSource {
*/
public void setAvailableResourcesToQueue(String partition, Resource limit) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
setAvailableResources(limit);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.setAvailableResources(limit);
if(this.queueName.equals("root")) {
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.setAvailableResources(limit);
}
}
}
}
/**
* Set Available resources with support for resource vectors.
*
* @param limit
*/
public void setAvailableResources(Resource limit) {
availableMB.set(limit.getMemorySize());
availableVCores.set(limit.getVirtualCores());
if (queueMetricsForCustomResources != null) {
@ -395,11 +538,11 @@ public class QueueMetrics implements MetricsSource {
AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC);
}
}
}
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
*
* @param limit resource limit
*/
public void setAvailableResourcesToQueue(Resource limit) {
@ -409,39 +552,67 @@ public class QueueMetrics implements MetricsSource {
/**
* Set available resources. To be called by scheduler periodically as
* resources become available.
*
* @param partition Node Partition
* @param user
* @param limit resource limit
*/
public void setAvailableResourcesToUser(String partition,
String user, Resource limit) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
public void setAvailableResourcesToUser(String partition, String user,
Resource limit) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.setAvailableResourcesToQueue(partition, limit);
userMetrics.setAvailableResources(limit);
}
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
QueueMetrics partitionUserMetrics =
partitionQueueMetrics.getUserMetrics(user);
if (partitionUserMetrics != null) {
partitionUserMetrics.setAvailableResources(limit);
}
}
}
/**
* Increment pending resource metrics
*
* @param partition Node Partition
* @param user
* @param containers
* @param res the TOTAL delta of resources note this is different from
* the other APIs which use per container resource
* @param res the TOTAL delta of resources note this is different from the
* other APIs which use per container resource
*/
public void incrPendingResources(String partition, String user,
int containers, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
_incrPendingResources(containers, res);
internalIncrPendingResources(partition, user, containers, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalIncrPendingResources(partition, user,
containers, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.incrementPendingResources(containers, res);
}
}
}
public void internalIncrPendingResources(String partition, String user,
int containers, Resource res) {
incrementPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.incrPendingResources(partition, user, containers, res);
userMetrics.internalIncrPendingResources(partition, user, containers,
res);
}
if (parent != null) {
parent.incrPendingResources(partition, user, containers, res);
}
parent.internalIncrPendingResources(partition, user, containers, res);
}
}
@ -503,7 +674,7 @@ public class QueueMetrics implements MetricsSource {
}
}
private void _incrPendingResources(int containers, Resource res) {
private void incrementPendingResources(int containers, Resource res) {
pendingContainers.incr(containers);
pendingMB.incr(res.getMemorySize() * containers);
pendingVCores.incr(res.getVirtualCores() * containers);
@ -516,19 +687,36 @@ public class QueueMetrics implements MetricsSource {
public void decrPendingResources(String partition, String user,
int containers, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
_decrPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.decrPendingResources(partition, user, containers, res);
internalDecrPendingResources(partition, user, containers, res);
}
if (parent != null) {
parent.decrPendingResources(partition, user, containers, res);
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalDecrPendingResources(partition, user,
containers, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.decrementPendingResources(containers, res);
}
}
}
private void _decrPendingResources(int containers, Resource res) {
protected void internalDecrPendingResources(String partition, String user,
int containers, Resource res) {
decrementPendingResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalDecrPendingResources(partition, user, containers,
res);
}
if (parent != null) {
parent.internalDecrPendingResources(partition, user, containers, res);
}
}
private void decrementPendingResources(int containers, Resource res) {
pendingContainers.decr(containers);
pendingMB.decr(res.getMemorySize() * containers);
pendingVCores.decr(res.getVirtualCores() * containers);
@ -558,12 +746,49 @@ public class QueueMetrics implements MetricsSource {
}
}
public void allocateResources(String partition, String user,
int containers, Resource res, boolean decrPending) {
public void allocateResources(String partition, String user, int containers,
Resource res, boolean decrPending) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalAllocateResources(partition, user, containers, res, decrPending);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalAllocateResources(partition, user,
containers, res, decrPending);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.computeAllocateResources(containers, res, decrPending);
}
}
}
public void internalAllocateResources(String partition, String user,
int containers, Resource res, boolean decrPending) {
computeAllocateResources(containers, res, decrPending);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalAllocateResources(partition, user, containers, res,
decrPending);
}
if (parent != null) {
parent.internalAllocateResources(partition, user, containers, res,
decrPending);
}
}
/**
* Allocate Resources for a partition with support for resource vectors.
*
* @param containers number of containers
* @param res resource containing memory size, vcores etc
* @param decrPending decides whether to decrease pending resource or not
*/
private void computeAllocateResources(int containers, Resource res,
boolean decrPending) {
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
allocatedMB.incr(res.getMemorySize() * containers);
allocatedVCores.incr(res.getVirtualCores() * containers);
if (queueMetricsForCustomResources != null) {
@ -572,18 +797,8 @@ public class QueueMetrics implements MetricsSource {
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
if (decrPending) {
_decrPendingResources(containers, res);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.allocateResources(partition, user,
containers, res, decrPending);
}
if (parent != null) {
parent.allocateResources(partition, user, containers, res, decrPending);
}
decrementPendingResources(containers, res);
}
}
@ -594,7 +809,6 @@ public class QueueMetrics implements MetricsSource {
* @param res
*/
public void allocateResources(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
allocatedMB.incr(res.getMemorySize());
allocatedVCores.incr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
@ -608,8 +822,7 @@ public class QueueMetrics implements MetricsSource {
pendingVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreasePending(res);
registerCustomResources(
queueMetricsForCustomResources.getPendingValues(),
registerCustomResources(queueMetricsForCustomResources.getPendingValues(),
PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC);
}
@ -621,11 +834,45 @@ public class QueueMetrics implements MetricsSource {
parent.allocateResources(partition, user, res);
}
}
public void releaseResources(String partition, String user, int containers,
Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalReleaseResources(partition, user, containers, res);
}
public void releaseResources(String partition,
String user, int containers, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalReleaseResources(partition, user,
containers, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.computeReleaseResources(containers, res);
}
}
}
public void internalReleaseResources(String partition, String user,
int containers, Resource res) {
computeReleaseResources(containers, res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalReleaseResources(partition, user, containers, res);
}
if (parent != null) {
parent.internalReleaseResources(partition, user, containers, res);
}
}
/**
* Release Resources for a partition with support for resource vectors.
*
* @param containers number of containers
* @param res resource containing memory size, vcores etc
*/
private void computeReleaseResources(int containers, Resource res) {
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
allocatedMB.decr(res.getMemorySize() * containers);
@ -636,40 +883,6 @@ public class QueueMetrics implements MetricsSource {
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(partition, user, containers, res);
}
if (parent != null) {
parent.releaseResources(partition, user, containers, res);
}
}
}
/**
* Release Resource for container size change.
*
* @param user
* @param res
*/
private void releaseResources(String user, Resource res) {
allocatedMB.decr(res.getMemorySize());
allocatedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
queueMetricsForCustomResources.decreaseAllocated(res);
registerCustomResources(
queueMetricsForCustomResources.getAllocatedValues(),
ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, res);
}
if (parent != null) {
parent.releaseResources(user, res);
}
}
public void preemptContainer() {
@ -728,11 +941,31 @@ public class QueueMetrics implements MetricsSource {
public void reserveResource(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
reserveResource(user, res);
internalReserveResources(partition, user, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalReserveResources(partition, user, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.incrReserveResources(res);
}
}
}
public void reserveResource(String user, Resource res) {
protected void internalReserveResources(String partition, String user,
Resource res) {
incrReserveResources(res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalReserveResources(partition, user, res);
}
if (parent != null) {
parent.internalReserveResources(partition, user, res);
}
}
public void incrReserveResources(Resource res) {
reservedContainers.incr();
reservedMB.incr(res.getMemorySize());
reservedVCores.incr(res.getVirtualCores());
@ -742,17 +975,37 @@ public class QueueMetrics implements MetricsSource {
queueMetricsForCustomResources.getReservedValues(),
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.reserveResource(user, res);
}
if (parent != null) {
parent.reserveResource(user, res);
public void unreserveResource(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
internalUnReserveResources(partition, user, res);
}
QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.internalUnReserveResources(partition, user, res);
QueueMetrics partitionMetrics = getPartitionMetrics(partition);
if (partitionMetrics != null) {
partitionMetrics.decrReserveResource(res);
}
}
}
private void unreserveResource(String user, Resource res) {
reservedContainers.decr();
protected void internalUnReserveResources(String partition, String user,
Resource res) {
decrReserveResource(res);
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.internalUnReserveResources(partition, user, res);
}
if (parent != null) {
parent.internalUnReserveResources(partition, user, res);
}
}
public void decrReserveResource(Resource res) {
int containers = 1;
reservedContainers.decr(containers);
reservedMB.decr(res.getMemorySize());
reservedVCores.decr(res.getVirtualCores());
if (queueMetricsForCustomResources != null) {
@ -761,19 +1014,6 @@ public class QueueMetrics implements MetricsSource {
queueMetricsForCustomResources.getReservedValues(),
RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC);
}
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.unreserveResource(user, res);
}
if (parent != null) {
parent.unreserveResource(user, res);
}
}
public void unreserveResource(String partition, String user, Resource res) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
unreserveResource(user, res);
}
}
public void incrActiveUsers() {
@ -1021,4 +1261,12 @@ public class QueueMetrics implements MetricsSource {
QueueMetricsForCustomResources metrics) {
this.queueMetricsForCustomResources = metrics;
}
public void setParent(QueueMetrics parent) {
this.parent = parent;
}
public Queue getParentQueue() {
return parentQueue;
}
}

View File

@ -229,7 +229,7 @@ public class CSQueueMetrics extends QueueMetrics {
public synchronized static CSQueueMetrics forQueue(String queueName,
Queue parent, boolean enableUserMetrics, Configuration conf) {
MetricsSystem ms = DefaultMetricsSystem.instance();
QueueMetrics metrics = QueueMetrics.getQueueMetrics().get(queueName);
QueueMetrics metrics = getQueueMetrics().get(queueName);
if (metrics == null) {
metrics =
new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf)
@ -241,7 +241,7 @@ public class CSQueueMetrics extends QueueMetrics {
ms.register(sourceName(queueName).toString(), "Metrics for queue: "
+ queueName, metrics);
}
QueueMetrics.getQueueMetrics().put(queueName, metrics);
getQueueMetrics().put(queueName, metrics);
}
return (CSQueueMetrics) metrics;
@ -254,7 +254,8 @@ public class CSQueueMetrics extends QueueMetrics {
}
CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName);
if (metrics == null) {
metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
metrics =
new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
users.put(userName, metrics);
metricsSystem.register(
sourceName(queueName).append(",user=").append(userName).toString(),

View File

@ -1393,8 +1393,9 @@ public class LeafQueue extends AbstractCSQueue {
: getQueueMaxResource(partition);
Resource headroom = Resources.componentwiseMin(
Resources.subtract(userLimitResource, user.getUsed(partition)),
Resources.subtract(currentPartitionResourceLimit,
Resources.subtractNonNegative(userLimitResource,
user.getUsed(partition)),
Resources.subtractNonNegative(currentPartitionResourceLimit,
queueUsage.getUsed(partition)));
// Normalize it before return
headroom =
@ -1714,11 +1715,16 @@ public class LeafQueue extends AbstractCSQueue {
User user = usersManager.updateUserResourceUsage(userName, resource,
nodePartition, true);
// Note this is a bit unconventional since it gets the object and modifies
// it here, rather then using set routine
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(nodePartition,
userName, application.getHeadroom());
Resource partitionHeadroom = Resources.createResource(0, 0);
if (metrics.getUserMetrics(userName) != null) {
partitionHeadroom = getHeadroom(user,
cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
getResourceLimitForActiveUsers(userName, clusterResource,
nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
nodePartition);
}
metrics.setAvailableResourcesToUser(nodePartition, userName,
partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(getQueuePath() + " user=" + userName + " used="
@ -1757,8 +1763,16 @@ public class LeafQueue extends AbstractCSQueue {
User user = usersManager.updateUserResourceUsage(userName, resource,
nodePartition, false);
metrics.setAvailableResourcesToUser(nodePartition,
userName, application.getHeadroom());
Resource partitionHeadroom = Resources.createResource(0, 0);
if (metrics.getUserMetrics(userName) != null) {
partitionHeadroom = getHeadroom(user,
cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
getResourceLimitForActiveUsers(userName, clusterResource,
nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
nodePartition);
}
metrics.setAvailableResourcesToUser(nodePartition, userName,
partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(

View File

@ -609,7 +609,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getSchedulerRequestKey(),
schedulerContainer.getRmContainer().getContainer());
schedulerContainer.getRmContainer());
((RMContainerImpl) rmContainer).setContainerRequest(
containerRequest);
@ -623,7 +623,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
AppSchedulingInfo.updateMetrics(getApplicationId(),
allocation.getAllocationLocalityType(),
schedulerContainer.getSchedulerNode(),
schedulerContainer.getRmContainer().getContainer(), getUser(),
schedulerContainer.getRmContainer(), getUser(),
getQueue());
}

View File

@ -462,7 +462,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
ContainerRequest containerRequest = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
type, node, schedulerKey, rmContainer);
this.attemptResourceUsage.incUsed(container.getResource());
getQueue().incUsedResource(container.getResource());

View File

@ -83,7 +83,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp {
// Update consumption and track allocations
ContainerRequest containerRequest = appSchedulingInfo.allocate(
type, node, schedulerKey, container);
type, node, schedulerKey, rmContainer);
attemptResourceUsage.incUsed(node.getPartition(),
container.getResource());

View File

@ -0,0 +1,752 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestPartitionQueueMetrics {
static final int GB = 1024; // MB
private static final Configuration CONF = new Configuration();
private MetricsSystem ms;
@Before
public void setUp() {
ms = new MetricsSystemImpl();
QueueMetrics.clearQueueMetrics();
PartitionQueueMetrics.clearQueueMetrics();
}
@After
public void tearDown() {
ms.shutdown();
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in only 1 partition, x.
*
* root
* / \
* q1 q2
*
* @throws Exception
*/
@Test
public void testSinglePartitionWithSingleLevelQueueMetrics()
throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
String user = "alice";
QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, CONF);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
QueueMetrics q2 =
QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
q1.submitApp(user);
q1.submitAppAttempt(user);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("x",
Resources.createResource(100 * GB, 100));
q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(ms, "x");
MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
MetricsSource q2Source = queueSource(ms, "x", "root.q2");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in both partitions, x & y.
*
* root
* / \
* q1 q2
*
* @throws Exception
*/
@Test
public void testTwoPartitionWithSingleLevelQueueMetrics() throws Exception {
String parentQueueName = "root";
String user = "alice";
QueueMetrics root =
QueueMetrics.forQueue(ms, parentQueueName, null, false, CONF);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, false, CONF);
QueueMetrics q2 =
QueueMetrics.forQueue(ms, "root.q2", parentQueue, false, CONF);
AppSchedulingInfo app = mockApp(user);
q1.submitApp(user);
q1.submitAppAttempt(user);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("x",
Resources.createResource(100 * GB, 100));
q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
MetricsSource xPartitionSource = partitionSource(ms, "x");
MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
checkResources(xPartitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(xRootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
root.setAvailableResourcesToQueue("y",
Resources.createResource(400 * GB, 400));
q2.setAvailableResourcesToQueue("y",
Resources.createResource(200 * GB, 200));
q2.incrPendingResources("y", user, 3, Resource.newInstance(1024, 1));
MetricsSource yPartitionSource = partitionSource(ms, "y");
MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
MetricsSource q2Source = queueSource(ms, "y", "root.q2");
checkResources(yPartitionSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
checkResources(yRootQueueSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3);
checkResources(q2Source, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
}
/**
* Structure:
* Both queues, q1 has been configured to run in multiple partitions, x & y.
*
* root
* /
* q1
*
* @throws Exception
*/
@Test
public void testMultiplePartitionWithSingleQueueMetrics() throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
QueueMetrics root =
QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
root.setAvailableResourcesToQueue("y",
Resources.createResource(300 * GB, 300));
q1.incrPendingResources("x", "test_user", 2, Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(ms, "x");
MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
checkResources(userSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
q1.incrPendingResources("x", "test_user", 3, Resource.newInstance(1024, 1));
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5);
checkResources(q1Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
checkResources(userSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
q1.incrPendingResources("x", "test_user1", 4,
Resource.newInstance(1024, 1));
MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9);
checkResources(q1Source, 0, 0, 0, 0, 0, 9 * GB, 9, 9);
checkResources(userSource1, 0, 0, 0, 0, 0, 4 * GB, 4, 4);
q1.incrPendingResources("y", "test_user1", 6,
Resource.newInstance(1024, 1));
MetricsSource partitionSourceY = partitionSource(ms, "y");
MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName);
MetricsSource q1SourceY = queueSource(ms, "y", "root.q1");
MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1");
checkResources(partitionSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
checkResources(rootQueueSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6);
checkResources(q1SourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
checkResources(userSourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in both partitions, x & y.
*
* root
* / \
* q1 q2
* q1
* / \
* q11 q12
* q2
* / \
* q21 q22
*
* @throws Exception
*/
@Test
public void testMultiplePartitionsWithMultiLevelQueuesMetrics()
throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
QueueMetrics root =
QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
when(parentQueue.getMetrics()).thenReturn(root);
QueueMetrics q1 =
QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF);
Queue childQueue1 = mock(Queue.class);
when(childQueue1.getQueueName()).thenReturn("root.q1");
when(childQueue1.getMetrics()).thenReturn(q1);
QueueMetrics q11 =
QueueMetrics.forQueue(ms, "root.q1.q11", childQueue1, true, CONF);
QueueMetrics q12 =
QueueMetrics.forQueue(ms, "root.q1.q12", childQueue1, true, CONF);
QueueMetrics q2 =
QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF);
Queue childQueue2 = mock(Queue.class);
when(childQueue2.getQueueName()).thenReturn("root.q2");
when(childQueue2.getMetrics()).thenReturn(q2);
QueueMetrics q21 =
QueueMetrics.forQueue(ms, "root.q2.q21", childQueue2, true, CONF);
QueueMetrics q22 =
QueueMetrics.forQueue(ms, "root.q2.q22", childQueue2, true, CONF);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("x",
Resources.createResource(100 * GB, 100));
q11.setAvailableResourcesToQueue("x",
Resources.createResource(50 * GB, 50));
q11.incrPendingResources("x", "test_user", 2,
Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(ms, "x");
MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName);
MetricsSource q1Source = queueSource(ms, "x", "root.q1");
MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2);
checkResources(userSource, 0, 0, 0, 0 * GB, 0, 2 * GB, 2, 2);
q11.incrPendingResources("x", "test_user", 4,
Resource.newInstance(1024, 1));
MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6);
checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 6 * GB, 6, 6);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 6 * GB, 6, 6);
checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
q11.incrPendingResources("x", "test_user1", 5,
Resource.newInstance(1024, 1));
MetricsSource q1UserSource1 = userSource(ms, "x", "test_user1", "root.q1");
MetricsSource userSource1 =
userSource(ms, "x", "test_user1", "root.q1.q11");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11);
checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 11 * GB, 11, 11);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 11 * GB, 11, 11);
checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6);
checkResources(q1UserSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
checkResources(userSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5);
q12.incrPendingResources("x", "test_user", 5,
Resource.newInstance(1024, 1));
MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16);
checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 16 * GB, 16, 16);
checkResources(q12Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
root.setAvailableResourcesToQueue("y",
Resources.createResource(200 * GB, 200));
q1.setAvailableResourcesToQueue("y",
Resources.createResource(100 * GB, 100));
q12.setAvailableResourcesToQueue("y",
Resources.createResource(50 * GB, 50));
q12.incrPendingResources("y", "test_user", 3,
Resource.newInstance(1024, 1));
MetricsSource yPartitionSource = partitionSource(ms, "y");
MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName);
MetricsSource q1YSource = queueSource(ms, "y", "root.q1");
MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12");
checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3);
checkResources(q1YSource, 0, 0, 0, 100 * GB, 100, 3 * GB, 3, 3);
checkResources(q12YSource, 0, 0, 0, 50 * GB, 50, 3 * GB, 3, 3);
root.setAvailableResourcesToQueue("y",
Resources.createResource(200 * GB, 200));
q2.setAvailableResourcesToQueue("y",
Resources.createResource(100 * GB, 100));
q21.setAvailableResourcesToQueue("y",
Resources.createResource(50 * GB, 50));
q21.incrPendingResources("y", "test_user", 5,
Resource.newInstance(1024, 1));
MetricsSource q21Source = queueSource(ms, "y", "root.q2.q21");
MetricsSource q2YSource = queueSource(ms, "y", "root.q2");
checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8);
checkResources(q2YSource, 0, 0, 0, 100 * GB, 100, 5 * GB, 5, 5);
checkResources(q21Source, 0, 0, 0, 50 * GB, 50, 5 * GB, 5, 5);
q22.incrPendingResources("y", "test_user", 6,
Resource.newInstance(1024, 1));
MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22");
checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14);
checkResources(q22Source, 0, 0, 0, 0, 0, 6 * GB, 6, 6);
}
@Test
public void testTwoLevelWithUserMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
String user = "alice";
String partition = "x";
QueueMetrics parentMetrics =
QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
when(parentQueue.getMetrics()).thenReturn(parentMetrics);
QueueMetrics metrics =
QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, CONF);
AppSchedulingInfo app = mockApp(user);
metrics.submitApp(user);
metrics.submitAppAttempt(user);
parentMetrics.setAvailableResourcesToQueue(partition,
Resources.createResource(100 * GB, 100));
metrics.setAvailableResourcesToQueue(partition,
Resources.createResource(100 * GB, 100));
parentMetrics.setAvailableResourcesToUser(partition, user,
Resources.createResource(10 * GB, 10));
metrics.setAvailableResourcesToUser(partition, user,
Resources.createResource(10 * GB, 10));
metrics.incrPendingResources(partition, user, 6,
Resources.createResource(3 * GB, 3));
MetricsSource partitionSource = partitionSource(ms, partition);
MetricsSource parentQueueSource =
queueSource(ms, partition, parentQueueName);
MetricsSource queueSource = queueSource(ms, partition, leafQueueName);
MetricsSource userSource = userSource(ms, partition, user, leafQueueName);
MetricsSource userSource1 =
userSource(ms, partition, user, parentQueueName);
checkResources(queueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6, 0,
0, 0);
checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
6, 0, 0, 0);
checkResources(userSource, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, 0,
0);
checkResources(userSource1, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
0, 0);
checkResources(partitionSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18,
6, 0, 0, 0);
metrics.runAppAttempt(app.getApplicationId(), user);
metrics.allocateResources(partition, user, 3,
Resources.createResource(1 * GB, 1), true);
metrics.reserveResource(partition, user,
Resources.createResource(3 * GB, 3));
// Available resources is set externally, as it depends on dynamic
// configurable cluster/queue resources
checkResources(queueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB, 15,
3, 3 * GB, 3, 1);
checkResources(parentQueueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100,
15 * GB, 15, 3, 3 * GB, 3, 1);
checkResources(partitionSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB,
15, 3, 3 * GB, 3, 1);
checkResources(userSource, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
3 * GB, 3, 1);
checkResources(userSource1, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3,
3 * GB, 3, 1);
metrics.allocateResources(partition, user, 3,
Resources.createResource(1 * GB, 1), true);
checkResources(queueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100, 12 * GB, 12,
0, 3 * GB, 3, 1);
checkResources(parentQueueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100,
12 * GB, 12, 0, 3 * GB, 3, 1);
metrics.releaseResources(partition, user, 1,
Resources.createResource(2 * GB, 2));
metrics.unreserveResource(partition, user,
Resources.createResource(3 * GB, 3));
checkResources(queueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB, 12,
0, 0, 0, 0);
checkResources(parentQueueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100,
12 * GB, 12, 0, 0, 0, 0);
checkResources(partitionSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB,
12, 0, 0, 0, 0);
checkResources(userSource, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
0, 0, 0);
checkResources(userSource1, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0,
0, 0, 0);
metrics.finishAppAttempt(app.getApplicationId(), app.isPending(),
app.getUser());
metrics.finishApp(user, RMAppState.FINISHED);
}
@Test
public void testThreeLevelWithUserMetrics() {
String parentQueueName = "root";
String leafQueueName = "root.leaf";
String leafQueueName1 = "root.leaf.leaf1";
String user = "alice";
String partitionX = "x";
String partitionY = "y";
QueueMetrics parentMetrics =
QueueMetrics.forQueue(parentQueueName, null, true, CONF);
Queue parentQueue = mock(Queue.class);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
when(parentQueue.getMetrics()).thenReturn(parentMetrics);
QueueMetrics metrics =
QueueMetrics.forQueue(leafQueueName, parentQueue, true, CONF);
Queue leafQueue = mock(Queue.class);
when(leafQueue.getQueueName()).thenReturn(leafQueueName);
when(leafQueue.getMetrics()).thenReturn(metrics);
QueueMetrics metrics1 =
QueueMetrics.forQueue(leafQueueName1, leafQueue, true, CONF);
AppSchedulingInfo app = mockApp(user);
metrics1.submitApp(user);
metrics1.submitAppAttempt(user);
parentMetrics.setAvailableResourcesToQueue(partitionX,
Resources.createResource(200 * GB, 200));
parentMetrics.setAvailableResourcesToQueue(partitionY,
Resources.createResource(500 * GB, 500));
metrics.setAvailableResourcesToQueue(partitionX,
Resources.createResource(100 * GB, 100));
metrics.setAvailableResourcesToQueue(partitionY,
Resources.createResource(400 * GB, 400));
metrics1.setAvailableResourcesToQueue(partitionX,
Resources.createResource(50 * GB, 50));
metrics1.setAvailableResourcesToQueue(partitionY,
Resources.createResource(300 * GB, 300));
parentMetrics.setAvailableResourcesToUser(partitionX, user,
Resources.createResource(20 * GB, 20));
parentMetrics.setAvailableResourcesToUser(partitionY, user,
Resources.createResource(50 * GB, 50));
metrics.setAvailableResourcesToUser(partitionX, user,
Resources.createResource(10 * GB, 10));
metrics.setAvailableResourcesToUser(partitionY, user,
Resources.createResource(40 * GB, 40));
metrics1.setAvailableResourcesToUser(partitionX, user,
Resources.createResource(5 * GB, 5));
metrics1.setAvailableResourcesToUser(partitionY, user,
Resources.createResource(30 * GB, 30));
metrics1.incrPendingResources(partitionX, user, 6,
Resources.createResource(3 * GB, 3));
metrics1.incrPendingResources(partitionY, user, 6,
Resources.createResource(4 * GB, 4));
MetricsSource partitionSourceX =
partitionSource(metrics1.getMetricsSystem(), partitionX);
MetricsSource parentQueueSourceWithPartX =
queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName);
MetricsSource queueSourceWithPartX =
queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName);
MetricsSource queueSource1WithPartX =
queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1);
MetricsSource parentUserSourceWithPartX = userSource(metrics1.getMetricsSystem(),
partitionX, user, parentQueueName);
MetricsSource userSourceWithPartX = userSource(metrics1.getMetricsSystem(),
partitionX, user, leafQueueName);
MetricsSource userSource1WithPartX = userSource(metrics1.getMetricsSystem(),
partitionX, user, leafQueueName1);
checkResources(partitionSourceX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB, 18,
6, 0, 0, 0);
checkResources(parentQueueSourceWithPartX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB,
18, 6, 0, 0, 0);
checkResources(queueSourceWithPartX, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6,
0, 0, 0);
checkResources(queueSource1WithPartX, 0, 0, 0, 0, 0, 50 * GB, 50, 18 * GB, 18, 6,
0, 0, 0);
checkResources(parentUserSourceWithPartX, 0, 0, 0, 0, 0, 20 * GB, 20, 18 * GB, 18,
6, 0, 0, 0);
checkResources(userSourceWithPartX, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0,
0, 0);
checkResources(userSource1WithPartX, 0, 0, 0, 0, 0, 5 * GB, 5, 18 * GB, 18, 6, 0,
0, 0);
MetricsSource partitionSourceY =
partitionSource(metrics1.getMetricsSystem(), partitionY);
MetricsSource parentQueueSourceWithPartY =
queueSource(metrics1.getMetricsSystem(), partitionY, parentQueueName);
MetricsSource queueSourceWithPartY =
queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName);
MetricsSource queueSource1WithPartY =
queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName1);
MetricsSource parentUserSourceWithPartY = userSource(metrics1.getMetricsSystem(),
partitionY, user, parentQueueName);
MetricsSource userSourceWithPartY = userSource(metrics1.getMetricsSystem(),
partitionY, user, leafQueueName);
MetricsSource userSource1WithPartY = userSource(metrics1.getMetricsSystem(),
partitionY, user, leafQueueName1);
checkResources(partitionSourceY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB, 24,
6, 0, 0, 0);
checkResources(parentQueueSourceWithPartY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB,
24, 6, 0, 0, 0);
checkResources(queueSourceWithPartY, 0, 0, 0, 0, 0, 400 * GB, 400, 24 * GB, 24, 6,
0, 0, 0);
checkResources(queueSource1WithPartY, 0, 0, 0, 0, 0, 300 * GB, 300, 24 * GB, 24, 6,
0, 0, 0);
checkResources(parentUserSourceWithPartY, 0, 0, 0, 0, 0, 50 * GB, 50, 24 * GB, 24,
6, 0, 0, 0);
checkResources(userSourceWithPartY, 0, 0, 0, 0, 0, 40 * GB, 40, 24 * GB, 24, 6, 0,
0, 0);
checkResources(userSource1WithPartY, 0, 0, 0, 0, 0, 30 * GB, 30, 24 * GB, 24, 6, 0,
0, 0);
metrics1.finishAppAttempt(app.getApplicationId(), app.isPending(),
app.getUser());
metrics1.finishApp(user, RMAppState.FINISHED);
}
/**
* Structure:
* Both queues, q1 & q2 has been configured to run in only 1 partition, x
* UserMetrics has been disabled, hence trying to access the user source
* throws NPE from sources.
*
* root
* / \
* q1 q2
*
* @throws Exception
*/
@Test(expected = NullPointerException.class)
public void testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics()
throws Exception {
String parentQueueName = "root";
Queue parentQueue = mock(Queue.class);
String user = "alice";
QueueMetrics root = QueueMetrics.forQueue("root", null, false, CONF);
when(parentQueue.getMetrics()).thenReturn(root);
when(parentQueue.getQueueName()).thenReturn(parentQueueName);
CSQueueMetrics q1 =
CSQueueMetrics.forQueue("root.q1", parentQueue, false, CONF);
CSQueueMetrics q2 =
CSQueueMetrics.forQueue("root.q2", parentQueue, false, CONF);
AppSchedulingInfo app = mockApp(user);
q1.submitApp(user);
q1.submitAppAttempt(user);
root.setAvailableResourcesToQueue("x",
Resources.createResource(200 * GB, 200));
q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1));
MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), "x");
MetricsSource rootQueueSource =
queueSource(q1.getMetricsSystem(), "x", parentQueueName);
MetricsSource q1Source = queueSource(q1.getMetricsSystem(), "x", "root.q1");
MetricsSource q1UserSource =
userSource(q1.getMetricsSystem(), "x", user, "root.q1");
checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2);
checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
checkResources(q1UserSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2);
q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1));
MetricsSource q2Source = queueSource(q2.getMetricsSystem(), "x", "root.q2");
MetricsSource q2UserSource =
userSource(q1.getMetricsSystem(), "x", user, "root.q2");
checkResources(partitionSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5);
checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
checkResources(q2UserSource, 0, 0, 0, 0, 0, 3 * GB, 3, 3);
q1.finishAppAttempt(app.getApplicationId(), app.isPending(), app.getUser());
q1.finishApp(user, RMAppState.FINISHED);
}
public static MetricsSource partitionSource(MetricsSystem ms,
String partition) {
MetricsSource s =
ms.getSource(QueueMetrics.pSourceName(partition).toString());
return s;
}
public static MetricsSource queueSource(MetricsSystem ms, String partition,
String queue) {
MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
.append(QueueMetrics.qSourceName(queue)).toString());
return s;
}
public static MetricsSource userSource(MetricsSystem ms, String partition,
String user, String queue) {
MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition)
.append(QueueMetrics.qSourceName(queue)).append(",user=")
.append(user).toString());
return s;
}
public static void checkResources(MetricsSource source, long allocatedMB,
int allocatedCores, int allocCtnrs, long availableMB, int availableCores,
long pendingMB, int pendingCores, int pendingCtnrs) {
MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb);
assertGauge("AllocatedVCores", allocatedCores, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb);
assertGauge("AvailableMB", availableMB, rb);
assertGauge("AvailableVCores", availableCores, rb);
assertGauge("PendingMB", pendingMB, rb);
assertGauge("PendingVCores", pendingCores, rb);
assertGauge("PendingContainers", pendingCtnrs, rb);
}
private static AppSchedulingInfo mockApp(String user) {
AppSchedulingInfo app = mock(AppSchedulingInfo.class);
when(app.getUser()).thenReturn(user);
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1);
when(app.getApplicationAttemptId()).thenReturn(id);
return app;
}
public static void checkResources(MetricsSource source, long allocatedMB,
int allocatedCores, int allocCtnrs, long aggreAllocCtnrs,
long aggreReleasedCtnrs, long availableMB, int availableCores,
long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB,
int reservedCores, int reservedCtnrs) {
MetricsRecordBuilder rb = getMetrics(source);
assertGauge("AllocatedMB", allocatedMB, rb);
assertGauge("AllocatedVCores", allocatedCores, rb);
assertGauge("AllocatedContainers", allocCtnrs, rb);
assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb);
assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb);
assertGauge("AvailableMB", availableMB, rb);
assertGauge("AvailableVCores", availableCores, rb);
assertGauge("PendingMB", pendingMB, rb);
assertGauge("PendingVCores", pendingCores, rb);
assertGauge("PendingContainers", pendingCtnrs, rb);
assertGauge("ReservedMB", reservedMB, rb);
assertGauge("ReservedVCores", reservedCores, rb);
assertGauge("ReservedContainers", reservedCtnrs, rb);
}
}

View File

@ -99,7 +99,7 @@ public class TestSchedulerApplicationAttempt {
app.liveContainers.put(container1.getContainerId(), container1);
SchedulerNode node = createNode();
app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
toSchedulerKey(requestedPriority), container1.getContainer());
toSchedulerKey(requestedPriority), container1);
// Active user count has to decrease from queue2 due to app has NO pending requests
assertEquals(0, queue2.getAbstractUsersManager().getNumActiveUsers());
@ -142,7 +142,7 @@ public class TestSchedulerApplicationAttempt {
app.liveContainers.put(container1.getContainerId(), container1);
SchedulerNode node = createNode();
app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node,
toSchedulerKey(requestedPriority), container1.getContainer());
toSchedulerKey(requestedPriority), container1);
// Reserved container
Priority prio1 = Priority.newInstance(1);

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@ -541,6 +542,11 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
}
protected MockRM setupSchedulerInstance() throws Exception {
if (mockRM != null) {
mockRM.stop();
}
CapacitySchedulerConfiguration conf = setupSchedulerConfiguration();
setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
@ -639,16 +645,26 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
autoCreatedLeafQueue.getMaximumAllocation().getMemorySize());
}
protected void validateInitialQueueEntitlement(CSQueue parentQueue, String
leafQueueName, Map<String, Float>
expectedTotalChildQueueAbsCapacityByLabel,
protected void validateInitialQueueEntitlement(CSQueue parentQueue,
String leafQueueName,
Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
Set<String> nodeLabels)
throws SchedulerDynamicEditException, InterruptedException {
validateInitialQueueEntitlement(cs, parentQueue, leafQueueName,
validateInitialQueueEntitlement(mockRM, cs, parentQueue, leafQueueName,
expectedTotalChildQueueAbsCapacityByLabel, nodeLabels);
}
protected void validateInitialQueueEntitlement(
protected void validateInitialQueueEntitlement(ResourceManager rm,
CSQueue parentQueue, String leafQueueName,
Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
Set<String> nodeLabels)
throws SchedulerDynamicEditException, InterruptedException {
validateInitialQueueEntitlement(rm,
(CapacityScheduler) rm.getResourceScheduler(), parentQueue,
leafQueueName, expectedTotalChildQueueAbsCapacityByLabel, nodeLabels);
}
protected void validateInitialQueueEntitlement(ResourceManager rm,
CapacityScheduler capacityScheduler, CSQueue parentQueue,
String leafQueueName,
Map<String, Float> expectedTotalChildQueueAbsCapacityByLabel,
@ -661,7 +677,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
(GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue
.getAutoCreatedQueueManagementPolicy();
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName);
AutoCreatedLeafQueue leafQueue =
(AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName);
Map<String, QueueEntitlement> expectedEntitlements = new HashMap<>();
QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate()
@ -679,7 +696,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
expectedEntitlements.put(label, expectedEntitlement);
validateEffectiveMinResource(leafQueue, label, expectedEntitlements);
validateEffectiveMinResource(rm, capacityScheduler, leafQueue, label,
expectedEntitlements);
}
}
@ -696,24 +714,24 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
.getMaximumCapacity(label), EPSILON);
}
protected void validateEffectiveMinResource(CSQueue leafQueue,
String label, Map<String, QueueEntitlement> expectedQueueEntitlements) {
protected void validateEffectiveMinResource(ResourceManager rm,
CapacityScheduler cs, CSQueue leafQueue, String label,
Map<String, QueueEntitlement> expectedQueueEntitlements) {
ManagedParentQueue parentQueue = (ManagedParentQueue) leafQueue.getParent();
Resource resourceByLabel = mockRM.getRMContext().getNodeLabelManager().
getResourceByLabel(label, cs.getClusterResource());
Resource resourceByLabel = rm.getRMContext().getNodeLabelManager()
.getResourceByLabel(label, cs.getClusterResource());
Resource effMinCapacity = Resources.multiply(resourceByLabel,
expectedQueueEntitlements.get(label).getCapacity() * parentQueue
.getQueueCapacities().getAbsoluteCapacity(label));
expectedQueueEntitlements.get(label).getCapacity()
* parentQueue.getQueueCapacities().getAbsoluteCapacity(label));
assertEquals(effMinCapacity, Resources.multiply(resourceByLabel,
leafQueue.getQueueCapacities().getAbsoluteCapacity(label)));
assertEquals(effMinCapacity, leafQueue.getEffectiveCapacity(label));
if (leafQueue.getQueueCapacities().getAbsoluteCapacity(label) > 0) {
assertTrue(Resources
.greaterThan(cs.getResourceCalculator(), cs.getClusterResource(),
effMinCapacity, Resources.none()));
} else{
assertTrue(Resources.greaterThan(cs.getResourceCalculator(),
cs.getClusterResource(), effMinCapacity, Resources.none()));
} else {
assertTrue(Resources.equals(effMinCapacity, Resources.none()));
}
}
@ -824,7 +842,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
updatedQueueTemplate.getQueueCapacities().getMaximumCapacity
(label));
assertEquals(expectedQueueEntitlements.get(label), newEntitlement);
validateEffectiveMinResource(leafQueue, label,
validateEffectiveMinResource(mockRM, cs, leafQueue, label,
expectedQueueEntitlements);
}
found = true;

View File

@ -670,7 +670,7 @@ public class TestCapacitySchedulerAutoQueueCreation
submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
Map<String, Float> expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(newCS, parentQueue, USER1,
validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER1,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//submit another app2 as USER2
@ -678,7 +678,7 @@ public class TestCapacitySchedulerAutoQueueCreation
1);
expectedAbsChildQueueCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(newCS, parentQueue, USER2,
validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER2,
expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC);
//validate total activated abs capacity remains the same
@ -773,7 +773,7 @@ public class TestCapacitySchedulerAutoQueueCreation
Map<String, Float> expectedChildQueueAbsCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(1);
validateInitialQueueEntitlement(newCS, parentQueue, USER1,
validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER1,
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
//submit another app2 as USER2
@ -782,7 +782,7 @@ public class TestCapacitySchedulerAutoQueueCreation
1);
expectedChildQueueAbsCapacity =
populateExpectedAbsCapacityByLabelForParentQueue(2);
validateInitialQueueEntitlement(newCS, parentQueue, USER2,
validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER2,
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
//update parent queue capacity

View File

@ -1237,9 +1237,9 @@ public class TestLeafQueue {
qb.finishApplication(app_0.getApplicationId(), user_0);
qb.finishApplication(app_2.getApplicationId(), user_1);
qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1),
null, null);
"", null);
qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1),
null, null);
"", null);
qb.setUserLimit(50);
qb.setUserLimitFactor(1);

View File

@ -25,9 +25,11 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
@ -49,11 +51,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestPartitionQueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -2338,10 +2343,9 @@ public class TestNodeLabelContainerAllocation {
assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB());
// The total memory tracked by QueueMetrics is 0GB for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() +
rootQueue.getMetrics().getAllocatedMB());
assertEquals(0 * GB, rootQueue.getMetrics().getAvailableMB()
+ rootQueue.getMetrics().getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a");
@ -2390,6 +2394,8 @@ public class TestNodeLabelContainerAllocation {
csConf.setCapacityByLabel(queueB, "x", 50);
csConf.setMaximumCapacityByLabel(queueB, "x", 50);
csConf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
// set node -> label
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", false)));
@ -2408,6 +2414,54 @@ public class TestNodeLabelContainerAllocation {
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <no_label>
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
double delta = 0.0001;
CSQueue leafQueue = cs.getQueue("a");
CSQueue leafQueueB = cs.getQueue("b");
CSQueue rootQueue = cs.getRootQueue();
assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta);
assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta);
MetricsSystem ms = leafQueueB.getMetrics().getMetricsSystem();
QueueMetrics partXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x");
QueueMetrics partDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "");
QueueMetrics queueAMetrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a");
QueueMetrics queueBMetrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root.b");
QueueMetrics queueAPartDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a");
QueueMetrics queueAPartXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a");
QueueMetrics queueBPartDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.b");
QueueMetrics queueBPartXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.b");
QueueMetrics rootMetrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root");
assertEquals(10 * GB, partXMetrics.getAvailableMB(), delta);
assertEquals(10 * GB, partDefaultMetrics.getAvailableMB(), delta);
assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta);
assertEquals(10 * GB, rootMetrics.getAvailableMB(), delta);
assertEquals(2.5 * GB, queueAMetrics.getAvailableMB(), delta);
assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta);
// app1 -> a
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
@ -2423,47 +2477,73 @@ public class TestNodeLabelContainerAllocation {
// app1 asks for 3 partition= containers
am1.allocate("*", 1 * GB, 3, new ArrayList<ContainerId>());
// NM1 do 50 heartbeats
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// app1 gets all resource in partition=x (non-exclusive)
Assert.assertEquals(3, schedulerNode1.getNumContainers());
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(7 * GB,
reportNm1.getAvailableResource().getMemorySize());
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
.getNodeReport(nm2.getNodeId());
Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(9 * GB,
reportNm2.getAvailableResource().getMemorySize());
LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
// 3GB is used from label x quota. 1.5 GB is remaining from default label.
// 2GB is remaining from label x.
assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB());
assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta);
assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta);
assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta);
assertEquals(1 * GB, queueAMetrics.getAllocatedMB(), delta);
assertEquals(1.5 * GB, queueAMetrics.getAvailableMB(), delta);
assertEquals(0 * GB, queueBMetrics.getAllocatedMB(), delta);
assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta);
assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueBPartDefaultMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueBPartXMetrics.getPendingMB(), delta);
assertEquals(1.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB());
assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta);
assertEquals(1 * GB, partDefaultMetrics.getAllocatedMB(), delta);
QueueMetrics partDefaultQueueAUserMetrics =
(QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "", "user",
"root.a");
QueueMetrics partXQueueAUserMetrics =
(QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "x", "user",
"root.a");
QueueMetrics queueAUserMetrics =
(QueueMetrics) TestQueueMetrics.userSource(ms, "root.a", "user");
assertEquals(2 * GB, queueAUserMetrics.getAvailableMB(), delta);
assertEquals(1 * GB, queueAUserMetrics.getAllocatedMB(), delta);
assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta);
assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
// app1 asks for 1 default partition container
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
// NM2 do couple of heartbeats
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
// app1 gets all resource in default partition
Assert.assertEquals(2, schedulerNode2.getNumContainers());
Assert.assertEquals(3, schedulerNode1.getNumContainers());
// 3GB is used from label x quota. 2GB used from default label.
// So 0.5 GB is remaining from default label.
@ -2472,10 +2552,100 @@ public class TestNodeLabelContainerAllocation {
// The total memory tracked by QueueMetrics is 10GB
// for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() +
rootQueue.getMetrics().getAllocatedMB());
assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAMetrics.getAllocatedMB());
assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(),
delta);
assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta);
assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta);
assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta);
// Pending Resources when containers are waiting on "default" partition
assertEquals(4 * GB, queueAMetrics.getPendingMB(), delta);
assertEquals(4 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
assertEquals(4 * GB, partDefaultQueueAUserMetrics.getPendingMB(),
delta);
assertEquals(4 * GB, queueAUserMetrics.getPendingMB(), delta);
assertEquals(4 * GB, partDefaultMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partXMetrics.getPendingMB(), delta);
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAMetrics.getAllocatedMB());
assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta);
assertEquals(0 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(7 * GB, queueAPartXMetrics.getAllocatedMB(), delta);
assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(),
delta);
assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta);
assertEquals(0 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
assertEquals(7 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta);
assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta);
assertEquals(3 * GB, partXMetrics.getAvailableMB(), delta);
assertEquals(7 * GB, partXMetrics.getAllocatedMB(), delta);
assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta);
assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta);
// Pending Resources after containers has been assigned on "x" partition
assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partDefaultQueueAUserMetrics.getPendingMB(),
delta);
assertEquals(0 * GB, queueAUserMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partDefaultMetrics.getPendingMB(), delta);
assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta);
assertEquals(0 * GB, partXMetrics.getPendingMB(), delta);
rm1.killApp(app1.getApplicationId());
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta);
assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta);
assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta);
assertEquals(2, queueAMetrics.getAggregateAllocatedContainers());
assertEquals(2, queueAMetrics.getAggegatedReleasedContainers());
assertEquals(2, queueAPartDefaultMetrics.getAggregateAllocatedContainers());
assertEquals(2, queueAPartDefaultMetrics.getAggegatedReleasedContainers());
assertEquals(7, partXMetrics.getAggregateAllocatedContainers());
assertEquals(2, partDefaultMetrics.getAggregateAllocatedContainers());
assertEquals(7, queueAPartXMetrics.getAggregateAllocatedContainers());
assertEquals(7, queueAPartXMetrics.getAggegatedReleasedContainers());
assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta);
assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, queueAUserMetrics.getAvailableMB(), delta);
assertEquals(3 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta);
assertEquals(5 * GB, partXQueueAUserMetrics.getAvailableMB(), delta);
rm1.close();
}
@ -2606,8 +2776,8 @@ public class TestNodeLabelContainerAllocation {
// The total memory tracked by QueueMetrics is 12GB
// for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() +
rootQueue.getMetrics().getAllocatedMB());
assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB()
+ rootQueue.getMetrics().getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a");
@ -2619,6 +2789,207 @@ public class TestNodeLabelContainerAllocation {
rm1.close();
}
@Test
public void testTwoLevelQueueMetricsWithLabels() throws Exception {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a"});
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String queueA = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(queueA, 100);
csConf.setAccessibleNodeLabels(queueA, toSet("x"));
csConf.setCapacityByLabel(queueA, "x", 100);
csConf.setMaximumCapacityByLabel(queueA, "x", 100);
csConf.setQueues(queueA, new String[] {"a1"});
final String queueA1 = queueA + ".a1";
csConf.setCapacity(queueA1, 100);
csConf.setAccessibleNodeLabels(queueA1, toSet("x"));
csConf.setCapacityByLabel(queueA1, "x", 100);
csConf.setMaximumCapacityByLabel(queueA1, "x", 100);
// set node -> label
// label x exclusivity is set to true
mgr.addToCluserNodeLabels(
ImmutableSet.of(NodeLabel.newInstance("x", true)));
mgr.addLabelsToNode(
ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = <no_label>
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
ParentQueue leafQueueA = (ParentQueue) cs.getQueue("a");
LeafQueue leafQueueA1 = (LeafQueue) cs.getQueue("a1");
assertEquals(12 * GB, leafQueueA1.getMetrics().getAvailableMB());
assertEquals(0 * GB, leafQueueA1.getMetrics().getAllocatedMB());
MetricsSystem ms = leafQueueA1.getMetrics().getMetricsSystem();
QueueMetrics partXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x");
QueueMetrics partDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "");
QueueMetrics queueAPartDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a");
QueueMetrics queueAPartXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a");
QueueMetrics queueA1PartDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a.a1");
QueueMetrics queueA1PartXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a.a1");
QueueMetrics queueRootPartDefaultMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root");
QueueMetrics queueRootPartXMetrics =
(QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root");
QueueMetrics queueAMetrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a");
QueueMetrics queueA1Metrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a.a1");
QueueMetrics queueRootMetrics =
(QueueMetrics) TestQueueMetrics.queueSource(ms, "root");
assertEquals(12 * GB, queueAMetrics.getAvailableMB());
assertEquals(12 * GB, queueA1Metrics.getAvailableMB());
assertEquals(12 * GB, queueRootMetrics.getAvailableMB());
assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB());
assertEquals(10 * GB, queueA1PartXMetrics.getAvailableMB());
assertEquals(10 * GB, queueAPartXMetrics.getAvailableMB());
assertEquals(10 * GB, queueRootPartXMetrics.getAvailableMB());
assertEquals(12 * GB, queueA1PartDefaultMetrics.getAvailableMB());
assertEquals(12 * GB, queueAPartDefaultMetrics.getAvailableMB());
assertEquals(12 * GB, queueRootPartDefaultMetrics.getAvailableMB());
assertEquals(10 * GB, partXMetrics.getAvailableMB());
assertEquals(12 * GB, partDefaultMetrics.getAvailableMB());
// app1 -> a
RMApp app1 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("a1")
.withAmLabel("x")
.build());
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// app1 asks for 5 partition=x containers
am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "x");
// NM1 do 50 heartbeats
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
// app1 gets all resource in partition=x
Assert.assertEquals(6, schedulerNode1.getNumContainers());
SchedulerNodeReport reportNm1 = rm1.getResourceScheduler()
.getNodeReport(nm1.getNodeId());
Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(4 * GB, reportNm1.getAvailableResource().getMemorySize());
SchedulerNodeReport reportNm2 = rm1.getResourceScheduler()
.getNodeReport(nm2.getNodeId());
Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(12 * GB,
reportNm2.getAvailableResource().getMemorySize());
assertEquals(0 * GB, queueAMetrics.getAllocatedMB());
assertEquals(0 * GB, queueA1Metrics.getAllocatedMB());
assertEquals(0 * GB, queueRootMetrics.getAllocatedMB());
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB());
assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB());
assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB());
assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB());
assertEquals(0 * GB, queueA1PartDefaultMetrics.getAllocatedMB());
assertEquals(0 * GB, queueAPartDefaultMetrics.getAllocatedMB());
assertEquals(0 * GB, queueRootPartDefaultMetrics.getAllocatedMB());
assertEquals(6 * GB, partXMetrics.getAllocatedMB());
assertEquals(0 * GB, partDefaultMetrics.getAllocatedMB());
// app2 -> a
RMApp app2 = MockRMAppSubmitter.submit(rm1,
MockRMAppSubmissionData.Builder.createWithMemory(1 * GB, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("a1")
.withAmLabel("")
.build());
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
// app2 asks for 5 partition= containers
am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>(), "");
// NM2 do 50 heartbeats
RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId());
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
}
// app1 gets all resource in partition=x
Assert.assertEquals(6, schedulerNode2.getNumContainers());
reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId());
Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize());
Assert.assertEquals(4 * GB,
reportNm1.getAvailableResource().getMemorySize());
reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId());
Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize());
Assert.assertEquals(6 * GB,
reportNm2.getAvailableResource().getMemorySize());
assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB());
assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB());
// The total memory tracked by QueueMetrics is 12GB
// for the default partition
CSQueue rootQueue = cs.getRootQueue();
assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB()
+ rootQueue.getMetrics().getAllocatedMB());
assertEquals(6 * GB, queueAMetrics.getAllocatedMB());
assertEquals(6 * GB, queueA1Metrics.getAllocatedMB());
assertEquals(6 * GB, queueRootMetrics.getAllocatedMB());
assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB());
assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB());
assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB());
assertEquals(6 * GB, queueA1PartDefaultMetrics.getAllocatedMB());
assertEquals(6 * GB, queueAPartDefaultMetrics.getAllocatedMB());
assertEquals(6 * GB, queueRootPartDefaultMetrics.getAllocatedMB());
assertEquals(6 * GB, partXMetrics.getAllocatedMB());
assertEquals(6 * GB, partDefaultMetrics.getAllocatedMB());
// Kill all apps in queue a
cs.killAllAppsInQueue("a1");
rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED);
rm1.waitForAppRemovedFromScheduler(app1.getApplicationId());
assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB());
assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores());
rm1.close();
}
@Test
public void testQueueMetricsWithLabelsDisableElasticity() throws Exception {
/**