diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 736110b0323..de8181ef8ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -37,7 +37,6 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; @@ -549,7 +548,7 @@ public class AppSchedulingInfo { public ContainerRequest allocate(NodeType type, SchedulerNode node, SchedulerRequestKey schedulerKey, - Container containerAllocated) { + RMContainer containerAllocated) { try { writeLock.lock(); @@ -708,7 +707,7 @@ public class AppSchedulingInfo { } private void updateMetricsForAllocatedContainer(NodeType type, - SchedulerNode node, Container containerAllocated) { + SchedulerNode node, RMContainer containerAllocated) { QueueMetrics metrics = queue.getMetrics(); if (pending) { // once an allocation is done we assume the application is @@ -721,18 +720,21 @@ public class AppSchedulingInfo { } public static void updateMetrics(ApplicationId applicationId, NodeType type, - SchedulerNode node, Container containerAllocated, String user, + SchedulerNode node, RMContainer containerAllocated, String user, Queue queue) { if (LOG.isDebugEnabled()) { LOG.debug("allocate: applicationId=" + applicationId + " container=" - + containerAllocated.getId() + " host=" + containerAllocated - .getNodeId().toString() + " user=" + user + " resource=" - + containerAllocated.getResource() + " type=" - + type); + + containerAllocated.getContainer().getId() + " host=" + + containerAllocated.getNodeId().toString() + " user=" + + user + " resource=" + + containerAllocated.getContainer().getResource() + " type=" + type); } if(node != null) { queue.getMetrics().allocateResources(node.getPartition(), user, 1, - containerAllocated.getResource(), true); + containerAllocated.getContainer().getResource(), false); + queue.getMetrics().decrPendingResources( + containerAllocated.getNodeLabelExpression(), user, 1, + containerAllocated.getContainer().getResource()); } queue.getMetrics().incrNodeTypeAggregations(user, type); } @@ -809,4 +811,8 @@ public class AppSchedulingInfo { this.readLock.unlock(); } } + + public RMContext getRMContext() { + return this.rmContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java index 491a9ce87ad..45db040c3e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerUpdateContext.java @@ -161,11 +161,17 @@ public class ContainerUpdateContext { // Decrement the pending using a dummy RR with // resource = prev update req capability if (pendingAsk != null && pendingAsk.getCount() > 0) { + Container container = Container.newInstance(UNDEFINED, + schedulerNode.getNodeID(), "host:port", + pendingAsk.getPerAllocationResource(), + schedulerKey.getPriority(), null); appSchedulingInfo.allocate(NodeType.OFF_SWITCH, schedulerNode, - schedulerKey, Container.newInstance(UNDEFINED, - schedulerNode.getNodeID(), "host:port", - pendingAsk.getPerAllocationResource(), - schedulerKey.getPriority(), null)); + schedulerKey, + new RMContainerImpl(container, schedulerKey, + appSchedulingInfo.getApplicationAttemptId(), + schedulerNode.getNodeID(), appSchedulingInfo.getUser(), + appSchedulingInfo.getRMContext(), + appPlacementAllocator.getPrimaryRequestedNodePartition())); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java new file mode 100644 index 00000000000..f43131809a0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; + +@Metrics(context = "yarn") +public class PartitionQueueMetrics extends QueueMetrics { + + private String partition; + + protected PartitionQueueMetrics(MetricsSystem ms, String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf, + String partition) { + super(ms, queueName, parent, enableUserMetrics, conf); + this.partition = partition; + if (getParentQueue() != null) { + String newQueueName = (getParentQueue() instanceof CSQueue) + ? ((CSQueue) getParentQueue()).getQueuePath() + : getParentQueue().getQueueName(); + String parentMetricName = + partition + METRIC_NAME_DELIMITER + newQueueName; + setParent(getQueueMetrics().get(parentMetricName)); + } + } + + /** + * Partition * Queue * User Metrics + * + * Computes Metrics at Partition (Node Label) * Queue * User Level. + * + * Sample JMX O/P Structure: + * + * PartitionQueueMetrics (labelX) + * QueueMetrics (A) + * usermetrics + * QueueMetrics (A1) + * usermetrics + * QueueMetrics (A2) + * usermetrics + * QueueMetrics (B) + * usermetrics + * + * @return QueueMetrics + */ + @Override + public synchronized QueueMetrics getUserMetrics(String userName) { + if (users == null) { + return null; + } + + String partitionJMXStr = + (partition.equals(DEFAULT_PARTITION)) ? DEFAULT_PARTITION_JMX_STR + : partition; + + QueueMetrics metrics = (PartitionQueueMetrics) users.get(userName); + if (metrics == null) { + metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName, + null, false, this.conf, this.partition); + users.put(userName, metrics); + metricsSystem.register( + pSourceName(partitionJMXStr).append(qSourceName(queueName)) + .append(",user=").append(userName).toString(), + "Metrics for user '" + userName + "' in queue '" + queueName + "'", + ((PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr) + .tag(QUEUE_INFO, queueName)).tag(USER_INFO, userName)); + } + return metrics; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index b05a0ae1f49..4d8a69cd4fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; @@ -54,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Splitter; @InterfaceAudience.Private @@ -114,17 +114,34 @@ public class QueueMetrics implements MetricsSource { info("Queue", "Metrics by queue"); protected static final MetricsInfo USER_INFO = info("User", "Metrics by user"); + protected static final MetricsInfo PARTITION_INFO = + info("Partition", "Metrics by partition"); static final Splitter Q_SPLITTER = Splitter.on('.').omitEmptyStrings().trimResults(); protected final MetricsRegistry registry; protected final String queueName; - protected final QueueMetrics parent; + private QueueMetrics parent; + private final Queue parentQueue; protected final MetricsSystem metricsSystem; protected final Map users; protected final Configuration conf; private QueueMetricsForCustomResources queueMetricsForCustomResources; + private final boolean enableUserMetrics; + + protected static final MetricsInfo P_RECORD_INFO = + info("PartitionQueueMetrics", "Metrics for the resource scheduler"); + + // Use "default" to operate NO_LABEL (default) partition internally + public static final String DEFAULT_PARTITION = "default"; + + // Use "" to register NO_LABEL (default) partition into metrics system + public static final String DEFAULT_PARTITION_JMX_STR = ""; + + // Metric Name Delimiter + public static final String METRIC_NAME_DELIMITER = "."; + private static final String ALLOCATED_RESOURCE_METRIC_PREFIX = "AllocatedResource."; private static final String ALLOCATED_RESOURCE_METRIC_DESC = @@ -150,13 +167,17 @@ public class QueueMetrics implements MetricsSource { private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC = "Aggregate Preempted Seconds for NAME"; - protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, - boolean enableUserMetrics, Configuration conf) { + public QueueMetrics(MetricsSystem ms, String queueName, Queue parent, + boolean enableUserMetrics, Configuration conf) { + registry = new MetricsRegistry(RECORD_INFO); this.queueName = queueName; + this.parent = parent != null ? parent.getMetrics() : null; - this.users = enableUserMetrics ? new HashMap() - : null; + this.parentQueue = parent; + this.users = enableUserMetrics ? new HashMap() : null; + this.enableUserMetrics = enableUserMetrics; + metricsSystem = ms; this.conf = conf; runningTime = buildBuckets(conf); @@ -178,12 +199,25 @@ public class QueueMetrics implements MetricsSource { return sb; } - public synchronized - static QueueMetrics forQueue(String queueName, Queue parent, - boolean enableUserMetrics, - Configuration conf) { + static StringBuilder pSourceName(String partition) { + StringBuilder sb = new StringBuilder(P_RECORD_INFO.name()); + sb.append(",partition").append('=').append(partition); + return sb; + } + + static StringBuilder qSourceName(String queueName) { + StringBuilder sb = new StringBuilder(); + int i = 0; + for (String node : Q_SPLITTER.split(queueName)) { + sb.append(",q").append(i++).append('=').append(node); + } + return sb; + } + + public synchronized static QueueMetrics forQueue(String queueName, + Queue parent, boolean enableUserMetrics, Configuration conf) { return forQueue(DefaultMetricsSystem.instance(), queueName, parent, - enableUserMetrics, conf); + enableUserMetrics, conf); } /** @@ -205,28 +239,24 @@ public class QueueMetrics implements MetricsSource { * * @return A string to {@link QueueMetrics} map. */ - protected static Map getQueueMetrics() { + public static Map getQueueMetrics() { return QUEUE_METRICS; } - public synchronized - static QueueMetrics forQueue(MetricsSystem ms, String queueName, - Queue parent, boolean enableUserMetrics, - Configuration conf) { - QueueMetrics metrics = QUEUE_METRICS.get(queueName); + public synchronized static QueueMetrics forQueue(MetricsSystem ms, + String queueName, Queue parent, boolean enableUserMetrics, + Configuration conf) { + QueueMetrics metrics = getQueueMetrics().get(queueName); if (metrics == null) { - metrics = - new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf). - tag(QUEUE_INFO, queueName); - + metrics = new QueueMetrics(ms, queueName, parent, enableUserMetrics, conf) + .tag(QUEUE_INFO, queueName); + // Register with the MetricsSystems if (ms != null) { - metrics = - ms.register( - sourceName(queueName).toString(), - "Metrics for queue: " + queueName, metrics); + metrics = ms.register(sourceName(queueName).toString(), + "Metrics for queue: " + queueName, metrics); } - QUEUE_METRICS.put(queueName, metrics); + getQueueMetrics().put(queueName, metrics); } return metrics; @@ -238,7 +268,8 @@ public class QueueMetrics implements MetricsSource { } QueueMetrics metrics = users.get(userName); if (metrics == null) { - metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf); + metrics = + new QueueMetrics(metricsSystem, queueName, null, false, conf); users.put(userName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), @@ -248,6 +279,96 @@ public class QueueMetrics implements MetricsSource { return metrics; } + /** + * Partition * Queue Metrics + * + * Computes Metrics at Partition (Node Label) * Queue Level. + * + * Sample JMX O/P Structure: + * + * PartitionQueueMetrics (labelX) + * QueueMetrics (A) + * metrics + * QueueMetrics (A1) + * metrics + * QueueMetrics (A2) + * metrics + * QueueMetrics (B) + * metrics + * + * @param partition + * @return QueueMetrics + */ + public synchronized QueueMetrics getPartitionQueueMetrics(String partition) { + + String partitionJMXStr = partition; + + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = DEFAULT_PARTITION; + partitionJMXStr = DEFAULT_PARTITION_JMX_STR; + } + + String metricName = partition + METRIC_NAME_DELIMITER + this.queueName; + QueueMetrics metrics = getQueueMetrics().get(metricName); + + if (metrics == null) { + QueueMetrics queueMetrics = + new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue, + this.enableUserMetrics, this.conf, partition); + metricsSystem.register( + pSourceName(partitionJMXStr).append(qSourceName(this.queueName)) + .toString(), + "Metrics for queue: " + this.queueName, + queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO, + this.queueName)); + getQueueMetrics().put(metricName, queueMetrics); + return queueMetrics; + } else { + return metrics; + } + } + + /** + * Partition Metrics + * + * Computes Metrics at Partition (Node Label) Level. + * + * Sample JMX O/P Structure: + * + * PartitionQueueMetrics (labelX) + * metrics + * + * @param partition + * @return QueueMetrics + */ + private QueueMetrics getPartitionMetrics(String partition) { + + String partitionJMXStr = partition; + if ((partition == null) + || (partition.equals(RMNodeLabelsManager.NO_LABEL))) { + partition = DEFAULT_PARTITION; + partitionJMXStr = DEFAULT_PARTITION_JMX_STR; + } + + String metricName = partition + METRIC_NAME_DELIMITER; + QueueMetrics metrics = getQueueMetrics().get(metricName); + if (metrics == null) { + metrics = new PartitionQueueMetrics(metricsSystem, this.queueName, null, + false, this.conf, partition); + + // Register with the MetricsSystems + if (metricsSystem != null) { + metricsSystem.register(pSourceName(partitionJMXStr).toString(), + "Metrics for partition: " + partitionJMXStr, + (PartitionQueueMetrics) metrics.tag(PARTITION_INFO, + partitionJMXStr)); + } + getQueueMetrics().put(metricName, metrics); + } + return metrics; + } + private ArrayList parseInts(String value) { ArrayList result = new ArrayList(); for(String s: value.split(",")) { @@ -388,20 +509,42 @@ public class QueueMetrics implements MetricsSource { */ public void setAvailableResourcesToQueue(String partition, Resource limit) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - availableMB.set(limit.getMemorySize()); - availableVCores.set(limit.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.setAvailable(limit); - registerCustomResources( - queueMetricsForCustomResources.getAvailableValues(), - AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC); + setAvailableResources(limit); + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.setAvailableResources(limit); + + if(this.queueName.equals("root")) { + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.setAvailableResources(limit); + } } } } + /** + * Set Available resources with support for resource vectors. + * + * @param limit + */ + public void setAvailableResources(Resource limit) { + availableMB.set(limit.getMemorySize()); + availableVCores.set(limit.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.setAvailable(limit); + registerCustomResources( + queueMetricsForCustomResources.getAvailableValues(), + AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC); + } + } + /** * Set available resources. To be called by scheduler periodically as * resources become available. + * * @param limit resource limit */ public void setAvailableResourcesToQueue(Resource limit) { @@ -411,42 +554,70 @@ public class QueueMetrics implements MetricsSource { /** * Set available resources. To be called by scheduler periodically as * resources become available. + * * @param partition Node Partition * @param user * @param limit resource limit */ - public void setAvailableResourcesToUser(String partition, - String user, Resource limit) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + public void setAvailableResourcesToUser(String partition, String user, + Resource limit) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.setAvailableResourcesToQueue(partition, limit); + userMetrics.setAvailableResources(limit); + } + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + QueueMetrics partitionUserMetrics = + partitionQueueMetrics.getUserMetrics(user); + if (partitionUserMetrics != null) { + partitionUserMetrics.setAvailableResources(limit); } } } /** * Increment pending resource metrics + * * @param partition Node Partition * @param user * @param containers - * @param res the TOTAL delta of resources note this is different from - * the other APIs which use per container resource + * @param res the TOTAL delta of resources note this is different from the + * other APIs which use per container resource */ public void incrPendingResources(String partition, String user, int containers, Resource res) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _incrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.incrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.incrPendingResources(partition, user, containers, res); + internalIncrPendingResources(partition, user, containers, res); + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalIncrPendingResources(partition, user, + containers, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.incrementPendingResources(containers, res); } } } + public void internalIncrPendingResources(String partition, String user, + int containers, Resource res) { + incrementPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalIncrPendingResources(partition, user, containers, + res); + } + if (parent != null) { + parent.internalIncrPendingResources(partition, user, containers, res); + } + } + protected Map initAndGetCustomResources() { Map customResources = new HashMap(); ResourceInformation[] resources = ResourceUtils.getResourceTypesArray(); @@ -505,7 +676,7 @@ public class QueueMetrics implements MetricsSource { } } - private void _incrPendingResources(int containers, Resource res) { + private void incrementPendingResources(int containers, Resource res) { pendingContainers.incr(containers); pendingMB.incr(res.getMemorySize() * containers); pendingVCores.incr(res.getVirtualCores() * containers); @@ -518,19 +689,36 @@ public class QueueMetrics implements MetricsSource { public void decrPendingResources(String partition, String user, int containers, Resource res) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - _decrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.decrPendingResources(partition, user, containers, res); - } - if (parent != null) { - parent.decrPendingResources(partition, user, containers, res); + internalDecrPendingResources(partition, user, containers, res); + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalDecrPendingResources(partition, user, + containers, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.decrementPendingResources(containers, res); } } } - private void _decrPendingResources(int containers, Resource res) { + protected void internalDecrPendingResources(String partition, String user, + int containers, Resource res) { + decrementPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalDecrPendingResources(partition, user, containers, + res); + } + if (parent != null) { + parent.internalDecrPendingResources(partition, user, containers, res); + } + } + + private void decrementPendingResources(int containers, Resource res) { pendingContainers.decr(containers); pendingMB.decr(res.getMemorySize() * containers); pendingVCores.decr(res.getVirtualCores() * containers); @@ -560,32 +748,59 @@ public class QueueMetrics implements MetricsSource { } } - public void allocateResources(String partition, String user, - int containers, Resource res, boolean decrPending) { + public void allocateResources(String partition, String user, int containers, + Resource res, boolean decrPending) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.incr(containers); - aggregateContainersAllocated.incr(containers); + internalAllocateResources(partition, user, containers, res, decrPending); + } - allocatedMB.incr(res.getMemorySize() * containers); - allocatedVCores.incr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseAllocated(res, containers); - registerCustomResources( - queueMetricsForCustomResources.getAllocatedValues(), - ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalAllocateResources(partition, user, + containers, res, decrPending); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.computeAllocateResources(containers, res, decrPending); } + } + } - if (decrPending) { - _decrPendingResources(containers, res); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, - containers, res, decrPending); - } - if (parent != null) { - parent.allocateResources(partition, user, containers, res, decrPending); - } + public void internalAllocateResources(String partition, String user, + int containers, Resource res, boolean decrPending) { + computeAllocateResources(containers, res, decrPending); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalAllocateResources(partition, user, containers, res, + decrPending); + } + if (parent != null) { + parent.internalAllocateResources(partition, user, containers, res, + decrPending); + } + } + + /** + * Allocate Resources for a partition with support for resource vectors. + * + * @param containers number of containers + * @param res resource containing memory size, vcores etc + * @param decrPending decides whether to decrease pending resource or not + */ + private void computeAllocateResources(int containers, Resource res, + boolean decrPending) { + allocatedContainers.incr(containers); + aggregateContainersAllocated.incr(containers); + allocatedMB.incr(res.getMemorySize() * containers); + allocatedVCores.incr(res.getVirtualCores() * containers); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.increaseAllocated(res, containers); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); + } + if (decrPending) { + decrementPendingResources(containers, res); } } @@ -596,81 +811,79 @@ public class QueueMetrics implements MetricsSource { * @param res */ public void allocateResources(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedMB.incr(res.getMemorySize()); - allocatedVCores.incr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.increaseAllocated(res); - registerCustomResources( - queueMetricsForCustomResources.getAllocatedValues(), - ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); - } - - pendingMB.decr(res.getMemorySize()); - pendingVCores.decr(res.getVirtualCores()); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreasePending(res); - registerCustomResources( - queueMetricsForCustomResources.getPendingValues(), - PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); - } - - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(partition, user, res); - } - if (parent != null) { - parent.allocateResources(partition, user, res); - } - } - } - - public void releaseResources(String partition, - String user, int containers, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - allocatedContainers.decr(containers); - aggregateContainersReleased.incr(containers); - allocatedMB.decr(res.getMemorySize() * containers); - allocatedVCores.decr(res.getVirtualCores() * containers); - if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseAllocated(res, containers); - registerCustomResources( - queueMetricsForCustomResources.getAllocatedValues(), - ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); - } - - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.releaseResources(partition, user, containers, res); - } - if (parent != null) { - parent.releaseResources(partition, user, containers, res); - } - } - } - - /** - * Release Resource for container size change. - * - * @param user - * @param res - */ - private void releaseResources(String user, Resource res) { - allocatedMB.decr(res.getMemorySize()); - allocatedVCores.decr(res.getVirtualCores()); + allocatedMB.incr(res.getMemorySize()); + allocatedVCores.incr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { - queueMetricsForCustomResources.decreaseAllocated(res); + queueMetricsForCustomResources.increaseAllocated(res); registerCustomResources( queueMetricsForCustomResources.getAllocatedValues(), ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } + pendingMB.decr(res.getMemorySize()); + pendingVCores.decr(res.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.decreasePending(res); + registerCustomResources(queueMetricsForCustomResources.getPendingValues(), + PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); + } + QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { - userMetrics.releaseResources(user, res); + userMetrics.allocateResources(partition, user, res); } if (parent != null) { - parent.releaseResources(user, res); + parent.allocateResources(partition, user, res); + } + } + + public void releaseResources(String partition, String user, int containers, + Resource res) { + + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + internalReleaseResources(partition, user, containers, res); + } + + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalReleaseResources(partition, user, + containers, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.computeReleaseResources(containers, res); + } + } + } + + public void internalReleaseResources(String partition, String user, + int containers, Resource res) { + + computeReleaseResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalReleaseResources(partition, user, containers, res); + } + if (parent != null) { + parent.internalReleaseResources(partition, user, containers, res); + } + } + + /** + * Release Resources for a partition with support for resource vectors. + * + * @param containers number of containers + * @param res resource containing memory size, vcores etc + */ + private void computeReleaseResources(int containers, Resource res) { + allocatedContainers.decr(containers); + aggregateContainersReleased.incr(containers); + allocatedMB.decr(res.getMemorySize() * containers); + allocatedVCores.decr(res.getVirtualCores() * containers); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.decreaseAllocated(res, containers); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } } @@ -721,11 +934,31 @@ public class QueueMetrics implements MetricsSource { public void reserveResource(String partition, String user, Resource res) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - reserveResource(user, res); + internalReserveResources(partition, user, res); + } + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalReserveResources(partition, user, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.incrReserveResources(res); + } } } - public void reserveResource(String user, Resource res) { + protected void internalReserveResources(String partition, String user, + Resource res) { + incrReserveResources(res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalReserveResources(partition, user, res); + } + if (parent != null) { + parent.internalReserveResources(partition, user, res); + } + } + + public void incrReserveResources(Resource res) { reservedContainers.incr(); reservedMB.incr(res.getMemorySize()); reservedVCores.incr(res.getVirtualCores()); @@ -735,17 +968,37 @@ public class QueueMetrics implements MetricsSource { queueMetricsForCustomResources.getReservedValues(), RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.reserveResource(user, res); + } + + public void unreserveResource(String partition, String user, Resource res) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + internalUnReserveResources(partition, user, res); } - if (parent != null) { - parent.reserveResource(user, res); + QueueMetrics partitionQueueMetrics = getPartitionQueueMetrics(partition); + if (partitionQueueMetrics != null) { + partitionQueueMetrics.internalUnReserveResources(partition, user, res); + QueueMetrics partitionMetrics = getPartitionMetrics(partition); + if (partitionMetrics != null) { + partitionMetrics.decrReserveResource(res); + } } } - private void unreserveResource(String user, Resource res) { - reservedContainers.decr(); + protected void internalUnReserveResources(String partition, String user, + Resource res) { + decrReserveResource(res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.internalUnReserveResources(partition, user, res); + } + if (parent != null) { + parent.internalUnReserveResources(partition, user, res); + } + } + + public void decrReserveResource(Resource res) { + int containers = 1; + reservedContainers.decr(containers); reservedMB.decr(res.getMemorySize()); reservedVCores.decr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { @@ -754,19 +1007,6 @@ public class QueueMetrics implements MetricsSource { queueMetricsForCustomResources.getReservedValues(), RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.unreserveResource(user, res); - } - if (parent != null) { - parent.unreserveResource(user, res); - } - } - - public void unreserveResource(String partition, String user, Resource res) { - if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { - unreserveResource(user, res); - } } public void incrActiveUsers() { @@ -980,4 +1220,12 @@ public class QueueMetrics implements MetricsSource { QueueMetricsForCustomResources metrics) { this.queueMetricsForCustomResources = metrics; } -} + + public void setParent(QueueMetrics parent) { + this.parent = parent; + } + + public Queue getParentQueue() { + return parentQueue; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index e9a0aafe6ee..106f56580da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -229,7 +229,7 @@ public class CSQueueMetrics extends QueueMetrics { public synchronized static CSQueueMetrics forQueue(String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { MetricsSystem ms = DefaultMetricsSystem.instance(); - QueueMetrics metrics = QueueMetrics.getQueueMetrics().get(queueName); + QueueMetrics metrics = getQueueMetrics().get(queueName); if (metrics == null) { metrics = new CSQueueMetrics(ms, queueName, parent, enableUserMetrics, conf) @@ -241,7 +241,7 @@ public class CSQueueMetrics extends QueueMetrics { ms.register(sourceName(queueName).toString(), "Metrics for queue: " + queueName, metrics); } - QueueMetrics.getQueueMetrics().put(queueName, metrics); + getQueueMetrics().put(queueName, metrics); } return (CSQueueMetrics) metrics; @@ -254,7 +254,8 @@ public class CSQueueMetrics extends QueueMetrics { } CSQueueMetrics metrics = (CSQueueMetrics) users.get(userName); if (metrics == null) { - metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf); + metrics = + new CSQueueMetrics(metricsSystem, queueName, null, false, conf); users.put(userName, metrics); metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index e12034e6567..67573400378 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1403,8 +1403,9 @@ public class LeafQueue extends AbstractCSQueue { : getQueueMaxResource(partition); Resource headroom = Resources.componentwiseMin( - Resources.subtract(userLimitResource, user.getUsed(partition)), - Resources.subtract(currentPartitionResourceLimit, + Resources.subtractNonNegative(userLimitResource, + user.getUsed(partition)), + Resources.subtractNonNegative(currentPartitionResourceLimit, queueUsage.getUsed(partition))); // Normalize it before return headroom = @@ -1718,11 +1719,16 @@ public class LeafQueue extends AbstractCSQueue { User user = usersManager.updateUserResourceUsage(userName, resource, nodePartition, true); - // Note this is a bit unconventional since it gets the object and modifies - // it here, rather then using set routine - Resources.subtractFrom(application.getHeadroom(), resource); // headroom - metrics.setAvailableResourcesToUser(nodePartition, - userName, application.getHeadroom()); + Resource partitionHeadroom = Resources.createResource(0, 0); + if (metrics.getUserMetrics(userName) != null) { + partitionHeadroom = getHeadroom(user, + cachedResourceLimitsForHeadroom.getLimit(), clusterResource, + getResourceLimitForActiveUsers(userName, clusterResource, + nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + nodePartition); + } + metrics.setAvailableResourcesToUser(nodePartition, userName, + partitionHeadroom); if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() + " user=" + userName + " used=" @@ -1761,8 +1767,16 @@ public class LeafQueue extends AbstractCSQueue { User user = usersManager.updateUserResourceUsage(userName, resource, nodePartition, false); - metrics.setAvailableResourcesToUser(nodePartition, - userName, application.getHeadroom()); + Resource partitionHeadroom = Resources.createResource(0, 0); + if (metrics.getUserMetrics(userName) != null) { + partitionHeadroom = getHeadroom(user, + cachedResourceLimitsForHeadroom.getLimit(), clusterResource, + getResourceLimitForActiveUsers(userName, clusterResource, + nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), + nodePartition); + } + metrics.setAvailableResourcesToUser(nodePartition, userName, + partitionHeadroom); if (LOG.isDebugEnabled()) { LOG.debug( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 9ca9bd022f4..b621fb87ed4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -616,7 +616,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { allocation.getAllocationLocalityType(), schedulerContainer.getSchedulerNode(), schedulerContainer.getSchedulerRequestKey(), - schedulerContainer.getRmContainer().getContainer()); + schedulerContainer.getRmContainer()); ((RMContainerImpl) rmContainer).setContainerRequest( containerRequest); @@ -630,7 +630,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { AppSchedulingInfo.updateMetrics(getApplicationId(), allocation.getAllocationLocalityType(), schedulerContainer.getSchedulerNode(), - schedulerContainer.getRmContainer().getContainer(), getUser(), + schedulerContainer.getRmContainer(), getUser(), getQueue()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index 2e564ca0dea..47b57bece96 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -463,7 +463,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt liveContainers.put(container.getId(), rmContainer); // Update consumption and track allocations ContainerRequest containerRequest = appSchedulingInfo.allocate( - type, node, schedulerKey, container); + type, node, schedulerKey, rmContainer); this.attemptResourceUsage.incUsed(container.getResource()); getQueue().incUsedResource(container.getResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java index 422f6db3044..a4db2d8ecd1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoAppAttempt.java @@ -83,7 +83,7 @@ public class FifoAppAttempt extends FiCaSchedulerApp { // Update consumption and track allocations ContainerRequest containerRequest = appSchedulingInfo.allocate( - type, node, schedulerKey, container); + type, node, schedulerKey, rmContainer); attemptResourceUsage.incUsed(node.getPartition(), container.getResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java new file mode 100644 index 00000000000..eb240d1b6d3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestPartitionQueueMetrics.java @@ -0,0 +1,752 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TestPartitionQueueMetrics { + + static final int GB = 1024; // MB + private static final Configuration CONF = new Configuration(); + + private MetricsSystem ms; + + @Before + public void setUp() { + ms = new MetricsSystemImpl(); + QueueMetrics.clearQueueMetrics(); + PartitionQueueMetrics.clearQueueMetrics(); + } + + @After + public void tearDown() { + ms.shutdown(); + } + + /** + * Structure: + * Both queues, q1 & q2 has been configured to run in only 1 partition, x. + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + + @Test + public void testSinglePartitionWithSingleLevelQueueMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + QueueMetrics root = QueueMetrics.forQueue(ms, "root", null, true, CONF); + when(parentQueue.getMetrics()).thenReturn(root); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF); + QueueMetrics q2 = + QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF); + + q1.submitApp(user); + q1.submitAppAttempt(user); + + root.setAvailableResourcesToQueue("x", + Resources.createResource(200 * GB, 200)); + q1.setAvailableResourcesToQueue("x", + Resources.createResource(100 * GB, 100)); + + q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2); + + q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1)); + MetricsSource q2Source = queueSource(ms, "x", "root.q2"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5); + checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3); + } + + /** + * Structure: + * Both queues, q1 & q2 has been configured to run in both partitions, x & y. + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test + public void testTwoPartitionWithSingleLevelQueueMetrics() throws Exception { + + String parentQueueName = "root"; + String user = "alice"; + + QueueMetrics root = + QueueMetrics.forQueue(ms, parentQueueName, null, false, CONF); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getMetrics()).thenReturn(root); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, false, CONF); + QueueMetrics q2 = + QueueMetrics.forQueue(ms, "root.q2", parentQueue, false, CONF); + + AppSchedulingInfo app = mockApp(user); + q1.submitApp(user); + q1.submitAppAttempt(user); + + root.setAvailableResourcesToQueue("x", + Resources.createResource(200 * GB, 200)); + q1.setAvailableResourcesToQueue("x", + Resources.createResource(100 * GB, 100)); + + q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource xPartitionSource = partitionSource(ms, "x"); + MetricsSource xRootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + + checkResources(xPartitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(xRootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2); + + root.setAvailableResourcesToQueue("y", + Resources.createResource(400 * GB, 400)); + q2.setAvailableResourcesToQueue("y", + Resources.createResource(200 * GB, 200)); + + q2.incrPendingResources("y", user, 3, Resource.newInstance(1024, 1)); + + MetricsSource yPartitionSource = partitionSource(ms, "y"); + MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName); + MetricsSource q2Source = queueSource(ms, "y", "root.q2"); + + checkResources(yPartitionSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3); + checkResources(yRootQueueSource, 0, 0, 0, 400 * GB, 400, 3 * GB, 3, 3); + checkResources(q2Source, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3); + } + + /** + * Structure: + * Both queues, q1 has been configured to run in multiple partitions, x & y. + * + * root + * / + * q1 + * + * @throws Exception + */ + @Test + public void testMultiplePartitionWithSingleQueueMetrics() throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + + QueueMetrics root = + QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF); + when(parentQueue.getMetrics()).thenReturn(root); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF); + + root.setAvailableResourcesToQueue("x", + Resources.createResource(200 * GB, 200)); + root.setAvailableResourcesToQueue("y", + Resources.createResource(300 * GB, 300)); + + q1.incrPendingResources("x", "test_user", 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2); + checkResources(userSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2); + + q1.incrPendingResources("x", "test_user", 3, Resource.newInstance(1024, 1)); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 5 * GB, 5, 5); + checkResources(q1Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5); + checkResources(userSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5); + + q1.incrPendingResources("x", "test_user1", 4, + Resource.newInstance(1024, 1)); + MetricsSource userSource1 = userSource(ms, "x", "test_user1", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 9 * GB, 9, 9); + checkResources(q1Source, 0, 0, 0, 0, 0, 9 * GB, 9, 9); + checkResources(userSource1, 0, 0, 0, 0, 0, 4 * GB, 4, 4); + + q1.incrPendingResources("y", "test_user1", 6, + Resource.newInstance(1024, 1)); + MetricsSource partitionSourceY = partitionSource(ms, "y"); + MetricsSource rootQueueSourceY = queueSource(ms, "y", parentQueueName); + MetricsSource q1SourceY = queueSource(ms, "y", "root.q1"); + MetricsSource userSourceY = userSource(ms, "y", "test_user1", "root.q1"); + + checkResources(partitionSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6); + checkResources(rootQueueSourceY, 0, 0, 0, 300 * GB, 300, 6 * GB, 6, 6); + checkResources(q1SourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6); + checkResources(userSourceY, 0, 0, 0, 0, 0, 6 * GB, 6, 6); + } + + /** + * Structure: + * Both queues, q1 & q2 has been configured to run in both partitions, x & y. + * + * root + * / \ + * q1 q2 + * q1 + * / \ + * q11 q12 + * q2 + * / \ + * q21 q22 + * + * @throws Exception + */ + + @Test + public void testMultiplePartitionsWithMultiLevelQueuesMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + + QueueMetrics root = + QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + when(parentQueue.getMetrics()).thenReturn(root); + + QueueMetrics q1 = + QueueMetrics.forQueue(ms, "root.q1", parentQueue, true, CONF); + Queue childQueue1 = mock(Queue.class); + when(childQueue1.getQueueName()).thenReturn("root.q1"); + when(childQueue1.getMetrics()).thenReturn(q1); + + QueueMetrics q11 = + QueueMetrics.forQueue(ms, "root.q1.q11", childQueue1, true, CONF); + QueueMetrics q12 = + QueueMetrics.forQueue(ms, "root.q1.q12", childQueue1, true, CONF); + + QueueMetrics q2 = + QueueMetrics.forQueue(ms, "root.q2", parentQueue, true, CONF); + Queue childQueue2 = mock(Queue.class); + when(childQueue2.getQueueName()).thenReturn("root.q2"); + when(childQueue2.getMetrics()).thenReturn(q2); + + QueueMetrics q21 = + QueueMetrics.forQueue(ms, "root.q2.q21", childQueue2, true, CONF); + QueueMetrics q22 = + QueueMetrics.forQueue(ms, "root.q2.q22", childQueue2, true, CONF); + + root.setAvailableResourcesToQueue("x", + Resources.createResource(200 * GB, 200)); + + q1.setAvailableResourcesToQueue("x", + Resources.createResource(100 * GB, 100)); + q11.setAvailableResourcesToQueue("x", + Resources.createResource(50 * GB, 50)); + + q11.incrPendingResources("x", "test_user", 2, + Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(ms, "x"); + MetricsSource rootQueueSource = queueSource(ms, "x", parentQueueName); + MetricsSource q1Source = queueSource(ms, "x", "root.q1"); + MetricsSource userSource = userSource(ms, "x", "test_user", "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 2 * GB, 2, 2); + checkResources(userSource, 0, 0, 0, 0 * GB, 0, 2 * GB, 2, 2); + + q11.incrPendingResources("x", "test_user", 4, + Resource.newInstance(1024, 1)); + + MetricsSource q11Source = queueSource(ms, "x", "root.q1.q11"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 6 * GB, 6, 6); + checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 6 * GB, 6, 6); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 6 * GB, 6, 6); + checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6); + + q11.incrPendingResources("x", "test_user1", 5, + Resource.newInstance(1024, 1)); + + MetricsSource q1UserSource1 = userSource(ms, "x", "test_user1", "root.q1"); + MetricsSource userSource1 = + userSource(ms, "x", "test_user1", "root.q1.q11"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 11 * GB, 11, 11); + checkResources(q11Source, 0, 0, 0, 50 * GB, 50, 11 * GB, 11, 11); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 11 * GB, 11, 11); + checkResources(userSource, 0, 0, 0, 0 * GB, 0, 6 * GB, 6, 6); + checkResources(q1UserSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5); + checkResources(userSource1, 0, 0, 0, 0 * GB, 0, 5 * GB, 5, 5); + + q12.incrPendingResources("x", "test_user", 5, + Resource.newInstance(1024, 1)); + MetricsSource q12Source = queueSource(ms, "x", "root.q1.q12"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 16 * GB, 16, 16); + checkResources(q1Source, 0, 0, 0, 100 * GB, 100, 16 * GB, 16, 16); + checkResources(q12Source, 0, 0, 0, 0, 0, 5 * GB, 5, 5); + + root.setAvailableResourcesToQueue("y", + Resources.createResource(200 * GB, 200)); + q1.setAvailableResourcesToQueue("y", + Resources.createResource(100 * GB, 100)); + q12.setAvailableResourcesToQueue("y", + Resources.createResource(50 * GB, 50)); + + q12.incrPendingResources("y", "test_user", 3, + Resource.newInstance(1024, 1)); + + MetricsSource yPartitionSource = partitionSource(ms, "y"); + MetricsSource yRootQueueSource = queueSource(ms, "y", parentQueueName); + MetricsSource q1YSource = queueSource(ms, "y", "root.q1"); + MetricsSource q12YSource = queueSource(ms, "y", "root.q1.q12"); + + checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3); + checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 3 * GB, 3, 3); + checkResources(q1YSource, 0, 0, 0, 100 * GB, 100, 3 * GB, 3, 3); + checkResources(q12YSource, 0, 0, 0, 50 * GB, 50, 3 * GB, 3, 3); + + root.setAvailableResourcesToQueue("y", + Resources.createResource(200 * GB, 200)); + q2.setAvailableResourcesToQueue("y", + Resources.createResource(100 * GB, 100)); + q21.setAvailableResourcesToQueue("y", + Resources.createResource(50 * GB, 50)); + + q21.incrPendingResources("y", "test_user", 5, + Resource.newInstance(1024, 1)); + MetricsSource q21Source = queueSource(ms, "y", "root.q2.q21"); + MetricsSource q2YSource = queueSource(ms, "y", "root.q2"); + + checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8); + checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 8 * GB, 8, 8); + checkResources(q2YSource, 0, 0, 0, 100 * GB, 100, 5 * GB, 5, 5); + checkResources(q21Source, 0, 0, 0, 50 * GB, 50, 5 * GB, 5, 5); + + q22.incrPendingResources("y", "test_user", 6, + Resource.newInstance(1024, 1)); + MetricsSource q22Source = queueSource(ms, "y", "root.q2.q22"); + + checkResources(yPartitionSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14); + checkResources(yRootQueueSource, 0, 0, 0, 200 * GB, 200, 14 * GB, 14, 14); + checkResources(q22Source, 0, 0, 0, 0, 0, 6 * GB, 6, 6); + } + + @Test + public void testTwoLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String user = "alice"; + String partition = "x"; + + QueueMetrics parentMetrics = + QueueMetrics.forQueue(ms, parentQueueName, null, true, CONF); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + QueueMetrics metrics = + QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, CONF); + AppSchedulingInfo app = mockApp(user); + + metrics.submitApp(user); + metrics.submitAppAttempt(user); + + parentMetrics.setAvailableResourcesToQueue(partition, + Resources.createResource(100 * GB, 100)); + metrics.setAvailableResourcesToQueue(partition, + Resources.createResource(100 * GB, 100)); + parentMetrics.setAvailableResourcesToUser(partition, user, + Resources.createResource(10 * GB, 10)); + metrics.setAvailableResourcesToUser(partition, user, + Resources.createResource(10 * GB, 10)); + metrics.incrPendingResources(partition, user, 6, + Resources.createResource(3 * GB, 3)); + + MetricsSource partitionSource = partitionSource(ms, partition); + MetricsSource parentQueueSource = + queueSource(ms, partition, parentQueueName); + MetricsSource queueSource = queueSource(ms, partition, leafQueueName); + MetricsSource userSource = userSource(ms, partition, user, leafQueueName); + MetricsSource userSource1 = + userSource(ms, partition, user, parentQueueName); + + checkResources(queueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6, 0, + 0, 0); + checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, + 6, 0, 0, 0); + checkResources(userSource, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, 0, + 0); + checkResources(userSource1, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, + 0, 0); + checkResources(partitionSource, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, + 6, 0, 0, 0); + + metrics.runAppAttempt(app.getApplicationId(), user); + + metrics.allocateResources(partition, user, 3, + Resources.createResource(1 * GB, 1), true); + metrics.reserveResource(partition, user, + Resources.createResource(3 * GB, 3)); + + // Available resources is set externally, as it depends on dynamic + // configurable cluster/queue resources + checkResources(queueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB, 15, + 3, 3 * GB, 3, 1); + checkResources(parentQueueSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, + 15 * GB, 15, 3, 3 * GB, 3, 1); + checkResources(partitionSource, 3 * GB, 3, 3, 3, 0, 100 * GB, 100, 15 * GB, + 15, 3, 3 * GB, 3, 1); + checkResources(userSource, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3, + 3 * GB, 3, 1); + checkResources(userSource1, 3 * GB, 3, 3, 3, 0, 10 * GB, 10, 15 * GB, 15, 3, + 3 * GB, 3, 1); + + metrics.allocateResources(partition, user, 3, + Resources.createResource(1 * GB, 1), true); + + checkResources(queueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100, 12 * GB, 12, + 0, 3 * GB, 3, 1); + checkResources(parentQueueSource, 6 * GB, 6, 6, 6, 0, 100 * GB, 100, + 12 * GB, 12, 0, 3 * GB, 3, 1); + + metrics.releaseResources(partition, user, 1, + Resources.createResource(2 * GB, 2)); + metrics.unreserveResource(partition, user, + Resources.createResource(3 * GB, 3)); + checkResources(queueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB, 12, + 0, 0, 0, 0); + checkResources(parentQueueSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, + 12 * GB, 12, 0, 0, 0, 0); + checkResources(partitionSource, 4 * GB, 4, 5, 6, 1, 100 * GB, 100, 12 * GB, + 12, 0, 0, 0, 0); + checkResources(userSource, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0, + 0, 0, 0); + checkResources(userSource1, 4 * GB, 4, 5, 6, 1, 10 * GB, 10, 12 * GB, 12, 0, + 0, 0, 0); + + metrics.finishAppAttempt(app.getApplicationId(), app.isPending(), + app.getUser()); + + metrics.finishApp(user, RMAppState.FINISHED); + } + + @Test + public void testThreeLevelWithUserMetrics() { + String parentQueueName = "root"; + String leafQueueName = "root.leaf"; + String leafQueueName1 = "root.leaf.leaf1"; + String user = "alice"; + String partitionX = "x"; + String partitionY = "y"; + + QueueMetrics parentMetrics = + QueueMetrics.forQueue(parentQueueName, null, true, CONF); + Queue parentQueue = mock(Queue.class); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + when(parentQueue.getMetrics()).thenReturn(parentMetrics); + QueueMetrics metrics = + QueueMetrics.forQueue(leafQueueName, parentQueue, true, CONF); + Queue leafQueue = mock(Queue.class); + when(leafQueue.getQueueName()).thenReturn(leafQueueName); + when(leafQueue.getMetrics()).thenReturn(metrics); + QueueMetrics metrics1 = + QueueMetrics.forQueue(leafQueueName1, leafQueue, true, CONF); + AppSchedulingInfo app = mockApp(user); + + metrics1.submitApp(user); + metrics1.submitAppAttempt(user); + + parentMetrics.setAvailableResourcesToQueue(partitionX, + Resources.createResource(200 * GB, 200)); + parentMetrics.setAvailableResourcesToQueue(partitionY, + Resources.createResource(500 * GB, 500)); + metrics.setAvailableResourcesToQueue(partitionX, + Resources.createResource(100 * GB, 100)); + metrics.setAvailableResourcesToQueue(partitionY, + Resources.createResource(400 * GB, 400)); + metrics1.setAvailableResourcesToQueue(partitionX, + Resources.createResource(50 * GB, 50)); + metrics1.setAvailableResourcesToQueue(partitionY, + Resources.createResource(300 * GB, 300)); + parentMetrics.setAvailableResourcesToUser(partitionX, user, + Resources.createResource(20 * GB, 20)); + parentMetrics.setAvailableResourcesToUser(partitionY, user, + Resources.createResource(50 * GB, 50)); + metrics.setAvailableResourcesToUser(partitionX, user, + Resources.createResource(10 * GB, 10)); + metrics.setAvailableResourcesToUser(partitionY, user, + Resources.createResource(40 * GB, 40)); + metrics1.setAvailableResourcesToUser(partitionX, user, + Resources.createResource(5 * GB, 5)); + metrics1.setAvailableResourcesToUser(partitionY, user, + Resources.createResource(30 * GB, 30)); + metrics1.incrPendingResources(partitionX, user, 6, + Resources.createResource(3 * GB, 3)); + metrics1.incrPendingResources(partitionY, user, 6, + Resources.createResource(4 * GB, 4)); + + MetricsSource partitionSourceX = + partitionSource(metrics1.getMetricsSystem(), partitionX); + + MetricsSource parentQueueSourceWithPartX = + queueSource(metrics1.getMetricsSystem(), partitionX, parentQueueName); + MetricsSource queueSourceWithPartX = + queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName); + MetricsSource queueSource1WithPartX = + queueSource(metrics1.getMetricsSystem(), partitionX, leafQueueName1); + MetricsSource parentUserSourceWithPartX = userSource(metrics1.getMetricsSystem(), + partitionX, user, parentQueueName); + MetricsSource userSourceWithPartX = userSource(metrics1.getMetricsSystem(), + partitionX, user, leafQueueName); + MetricsSource userSource1WithPartX = userSource(metrics1.getMetricsSystem(), + partitionX, user, leafQueueName1); + + checkResources(partitionSourceX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB, 18, + 6, 0, 0, 0); + checkResources(parentQueueSourceWithPartX, 0, 0, 0, 0, 0, 200 * GB, 200, 18 * GB, + 18, 6, 0, 0, 0); + + checkResources(queueSourceWithPartX, 0, 0, 0, 0, 0, 100 * GB, 100, 18 * GB, 18, 6, + 0, 0, 0); + checkResources(queueSource1WithPartX, 0, 0, 0, 0, 0, 50 * GB, 50, 18 * GB, 18, 6, + 0, 0, 0); + checkResources(parentUserSourceWithPartX, 0, 0, 0, 0, 0, 20 * GB, 20, 18 * GB, 18, + 6, 0, 0, 0); + checkResources(userSourceWithPartX, 0, 0, 0, 0, 0, 10 * GB, 10, 18 * GB, 18, 6, 0, + 0, 0); + checkResources(userSource1WithPartX, 0, 0, 0, 0, 0, 5 * GB, 5, 18 * GB, 18, 6, 0, + 0, 0); + + MetricsSource partitionSourceY = + partitionSource(metrics1.getMetricsSystem(), partitionY); + + MetricsSource parentQueueSourceWithPartY = + queueSource(metrics1.getMetricsSystem(), partitionY, parentQueueName); + MetricsSource queueSourceWithPartY = + queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName); + MetricsSource queueSource1WithPartY = + queueSource(metrics1.getMetricsSystem(), partitionY, leafQueueName1); + MetricsSource parentUserSourceWithPartY = userSource(metrics1.getMetricsSystem(), + partitionY, user, parentQueueName); + MetricsSource userSourceWithPartY = userSource(metrics1.getMetricsSystem(), + partitionY, user, leafQueueName); + MetricsSource userSource1WithPartY = userSource(metrics1.getMetricsSystem(), + partitionY, user, leafQueueName1); + + checkResources(partitionSourceY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB, 24, + 6, 0, 0, 0); + checkResources(parentQueueSourceWithPartY, 0, 0, 0, 0, 0, 500 * GB, 500, 24 * GB, + 24, 6, 0, 0, 0); + checkResources(queueSourceWithPartY, 0, 0, 0, 0, 0, 400 * GB, 400, 24 * GB, 24, 6, + 0, 0, 0); + checkResources(queueSource1WithPartY, 0, 0, 0, 0, 0, 300 * GB, 300, 24 * GB, 24, 6, + 0, 0, 0); + checkResources(parentUserSourceWithPartY, 0, 0, 0, 0, 0, 50 * GB, 50, 24 * GB, 24, + 6, 0, 0, 0); + checkResources(userSourceWithPartY, 0, 0, 0, 0, 0, 40 * GB, 40, 24 * GB, 24, 6, 0, + 0, 0); + checkResources(userSource1WithPartY, 0, 0, 0, 0, 0, 30 * GB, 30, 24 * GB, 24, 6, 0, + 0, 0); + + metrics1.finishAppAttempt(app.getApplicationId(), app.isPending(), + app.getUser()); + + metrics1.finishApp(user, RMAppState.FINISHED); + } + + /** + * Structure: + * Both queues, q1 & q2 has been configured to run in only 1 partition, x + * UserMetrics has been disabled, hence trying to access the user source + * throws NPE from sources. + * + * root + * / \ + * q1 q2 + * + * @throws Exception + */ + @Test(expected = NullPointerException.class) + public void testSinglePartitionWithSingleLevelQueueMetricsWithoutUserMetrics() + throws Exception { + + String parentQueueName = "root"; + Queue parentQueue = mock(Queue.class); + String user = "alice"; + + QueueMetrics root = QueueMetrics.forQueue("root", null, false, CONF); + when(parentQueue.getMetrics()).thenReturn(root); + when(parentQueue.getQueueName()).thenReturn(parentQueueName); + CSQueueMetrics q1 = + CSQueueMetrics.forQueue("root.q1", parentQueue, false, CONF); + CSQueueMetrics q2 = + CSQueueMetrics.forQueue("root.q2", parentQueue, false, CONF); + + AppSchedulingInfo app = mockApp(user); + + q1.submitApp(user); + q1.submitAppAttempt(user); + + root.setAvailableResourcesToQueue("x", + Resources.createResource(200 * GB, 200)); + + q1.incrPendingResources("x", user, 2, Resource.newInstance(1024, 1)); + + MetricsSource partitionSource = partitionSource(q1.getMetricsSystem(), "x"); + MetricsSource rootQueueSource = + queueSource(q1.getMetricsSystem(), "x", parentQueueName); + MetricsSource q1Source = queueSource(q1.getMetricsSystem(), "x", "root.q1"); + MetricsSource q1UserSource = + userSource(q1.getMetricsSystem(), "x", user, "root.q1"); + + checkResources(partitionSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(rootQueueSource, 0, 0, 0, 200 * GB, 200, 2 * GB, 2, 2); + checkResources(q1Source, 0, 0, 0, 0, 0, 2 * GB, 2, 2); + checkResources(q1UserSource, 0, 0, 0, 0, 0, 2 * GB, 2, 2); + + q2.incrPendingResources("x", user, 3, Resource.newInstance(1024, 1)); + MetricsSource q2Source = queueSource(q2.getMetricsSystem(), "x", "root.q2"); + MetricsSource q2UserSource = + userSource(q1.getMetricsSystem(), "x", user, "root.q2"); + + checkResources(partitionSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5); + checkResources(rootQueueSource, 0, 0, 0, 0, 0, 5 * GB, 5, 5); + checkResources(q2Source, 0, 0, 0, 0, 0, 3 * GB, 3, 3); + checkResources(q2UserSource, 0, 0, 0, 0, 0, 3 * GB, 3, 3); + + q1.finishAppAttempt(app.getApplicationId(), app.isPending(), app.getUser()); + q1.finishApp(user, RMAppState.FINISHED); + } + + public static MetricsSource partitionSource(MetricsSystem ms, + String partition) { + MetricsSource s = + ms.getSource(QueueMetrics.pSourceName(partition).toString()); + return s; + } + + public static MetricsSource queueSource(MetricsSystem ms, String partition, + String queue) { + MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition) + .append(QueueMetrics.qSourceName(queue)).toString()); + return s; + } + + public static MetricsSource userSource(MetricsSystem ms, String partition, + String user, String queue) { + MetricsSource s = ms.getSource(QueueMetrics.pSourceName(partition) + .append(QueueMetrics.qSourceName(queue)).append(",user=") + .append(user).toString()); + return s; + } + + public static void checkResources(MetricsSource source, long allocatedMB, + int allocatedCores, int allocCtnrs, long availableMB, int availableCores, + long pendingMB, int pendingCores, int pendingCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); + assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + } + + private static AppSchedulingInfo mockApp(String user) { + AppSchedulingInfo app = mock(AppSchedulingInfo.class); + when(app.getUser()).thenReturn(user); + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + ApplicationAttemptId id = BuilderUtils.newApplicationAttemptId(appId, 1); + when(app.getApplicationAttemptId()).thenReturn(id); + return app; + } + + public static void checkResources(MetricsSource source, long allocatedMB, + int allocatedCores, int allocCtnrs, long aggreAllocCtnrs, + long aggreReleasedCtnrs, long availableMB, int availableCores, + long pendingMB, int pendingCores, int pendingCtnrs, long reservedMB, + int reservedCores, int reservedCtnrs) { + MetricsRecordBuilder rb = getMetrics(source); + assertGauge("AllocatedMB", allocatedMB, rb); + assertGauge("AllocatedVCores", allocatedCores, rb); + assertGauge("AllocatedContainers", allocCtnrs, rb); + assertCounter("AggregateContainersAllocated", aggreAllocCtnrs, rb); + assertCounter("AggregateContainersReleased", aggreReleasedCtnrs, rb); + assertGauge("AvailableMB", availableMB, rb); + assertGauge("AvailableVCores", availableCores, rb); + assertGauge("PendingMB", pendingMB, rb); + assertGauge("PendingVCores", pendingCores, rb); + assertGauge("PendingContainers", pendingCtnrs, rb); + assertGauge("ReservedMB", reservedMB, rb); + assertGauge("ReservedVCores", reservedCores, rb); + assertGauge("ReservedContainers", reservedCtnrs, rb); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index c3fe5f5145d..b9ba747539e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -98,7 +98,7 @@ public class TestSchedulerApplicationAttempt { app.liveContainers.put(container1.getContainerId(), container1); SchedulerNode node = createNode(); app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node, - toSchedulerKey(requestedPriority), container1.getContainer()); + toSchedulerKey(requestedPriority), container1); // Active user count has to decrease from queue2 due to app has NO pending requests assertEquals(0, queue2.getAbstractUsersManager().getNumActiveUsers()); @@ -140,7 +140,7 @@ public class TestSchedulerApplicationAttempt { app.liveContainers.put(container1.getContainerId(), container1); SchedulerNode node = createNode(); app.appSchedulingInfo.allocate(NodeType.OFF_SWITCH, node, - toSchedulerKey(requestedPriority), container1.getContainer()); + toSchedulerKey(requestedPriority), container1); // Reserved container Priority prio1 = Priority.newInstance(1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index 67463b1037e..fd9a1baa32e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels .NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -495,6 +496,11 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { } protected MockRM setupSchedulerInstance() throws Exception { + + if (mockRM != null) { + mockRM.stop(); + } + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, @@ -579,16 +585,26 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { autoCreatedLeafQueue.getMaxApplicationsPerUser()); } - protected void validateInitialQueueEntitlement(CSQueue parentQueue, String - leafQueueName, Map - expectedTotalChildQueueAbsCapacityByLabel, + protected void validateInitialQueueEntitlement(CSQueue parentQueue, + String leafQueueName, + Map expectedTotalChildQueueAbsCapacityByLabel, Set nodeLabels) throws SchedulerDynamicEditException, InterruptedException { - validateInitialQueueEntitlement(cs, parentQueue, leafQueueName, + validateInitialQueueEntitlement(mockRM, cs, parentQueue, leafQueueName, expectedTotalChildQueueAbsCapacityByLabel, nodeLabels); } - protected void validateInitialQueueEntitlement( + protected void validateInitialQueueEntitlement(ResourceManager rm, + CSQueue parentQueue, String leafQueueName, + Map expectedTotalChildQueueAbsCapacityByLabel, + Set nodeLabels) + throws SchedulerDynamicEditException, InterruptedException { + validateInitialQueueEntitlement(rm, + (CapacityScheduler) rm.getResourceScheduler(), parentQueue, + leafQueueName, expectedTotalChildQueueAbsCapacityByLabel, nodeLabels); + } + + protected void validateInitialQueueEntitlement(ResourceManager rm, CapacityScheduler capacityScheduler, CSQueue parentQueue, String leafQueueName, Map expectedTotalChildQueueAbsCapacityByLabel, @@ -601,7 +617,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { (GuaranteedOrZeroCapacityOverTimePolicy) autoCreateEnabledParentQueue .getAutoCreatedQueueManagementPolicy(); - AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName); + AutoCreatedLeafQueue leafQueue = + (AutoCreatedLeafQueue) capacityScheduler.getQueue(leafQueueName); Map expectedEntitlements = new HashMap<>(); QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate() @@ -619,7 +636,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { expectedEntitlements.put(label, expectedEntitlement); - validateEffectiveMinResource(leafQueue, label, expectedEntitlements); + validateEffectiveMinResource(rm, capacityScheduler, leafQueue, label, + expectedEntitlements); } } @@ -636,24 +654,24 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { .getMaximumCapacity(label), EPSILON); } - protected void validateEffectiveMinResource(CSQueue leafQueue, - String label, Map expectedQueueEntitlements) { + protected void validateEffectiveMinResource(ResourceManager rm, + CapacityScheduler cs, CSQueue leafQueue, String label, + Map expectedQueueEntitlements) { ManagedParentQueue parentQueue = (ManagedParentQueue) leafQueue.getParent(); - Resource resourceByLabel = mockRM.getRMContext().getNodeLabelManager(). - getResourceByLabel(label, cs.getClusterResource()); + Resource resourceByLabel = rm.getRMContext().getNodeLabelManager() + .getResourceByLabel(label, cs.getClusterResource()); Resource effMinCapacity = Resources.multiply(resourceByLabel, - expectedQueueEntitlements.get(label).getCapacity() * parentQueue - .getQueueCapacities().getAbsoluteCapacity(label)); + expectedQueueEntitlements.get(label).getCapacity() + * parentQueue.getQueueCapacities().getAbsoluteCapacity(label)); assertEquals(effMinCapacity, Resources.multiply(resourceByLabel, leafQueue.getQueueCapacities().getAbsoluteCapacity(label))); assertEquals(effMinCapacity, leafQueue.getEffectiveCapacity(label)); if (leafQueue.getQueueCapacities().getAbsoluteCapacity(label) > 0) { - assertTrue(Resources - .greaterThan(cs.getResourceCalculator(), cs.getClusterResource(), - effMinCapacity, Resources.none())); - } else{ + assertTrue(Resources.greaterThan(cs.getResourceCalculator(), + cs.getClusterResource(), effMinCapacity, Resources.none())); + } else { assertTrue(Resources.equals(effMinCapacity, Resources.none())); } } @@ -764,7 +782,7 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { updatedQueueTemplate.getQueueCapacities().getMaximumCapacity (label)); assertEquals(expectedQueueEntitlements.get(label), newEntitlement); - validateEffectiveMinResource(leafQueue, label, + validateEffectiveMinResource(mockRM, cs, leafQueue, label, expectedQueueEntitlements); } found = true; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index f859ca73285..76e1ada7d1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -622,7 +622,7 @@ public class TestCapacitySchedulerAutoQueueCreation submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1); Map expectedAbsChildQueueCapacity = populateExpectedAbsCapacityByLabelForParentQueue(1); - validateInitialQueueEntitlement(newCS, parentQueue, USER1, + validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER1, expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC); //submit another app2 as USER2 @@ -630,7 +630,7 @@ public class TestCapacitySchedulerAutoQueueCreation 1); expectedAbsChildQueueCapacity = populateExpectedAbsCapacityByLabelForParentQueue(2); - validateInitialQueueEntitlement(newCS, parentQueue, USER2, + validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER2, expectedAbsChildQueueCapacity, accessibleNodeLabelsOnC); //validate total activated abs capacity remains the same @@ -725,7 +725,7 @@ public class TestCapacitySchedulerAutoQueueCreation Map expectedChildQueueAbsCapacity = populateExpectedAbsCapacityByLabelForParentQueue(1); - validateInitialQueueEntitlement(newCS, parentQueue, USER1, + validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER1, expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC); //submit another app2 as USER2 @@ -734,7 +734,7 @@ public class TestCapacitySchedulerAutoQueueCreation 1); expectedChildQueueAbsCapacity = populateExpectedAbsCapacityByLabelForParentQueue(2); - validateInitialQueueEntitlement(newCS, parentQueue, USER2, + validateInitialQueueEntitlement(newMockRM, newCS, parentQueue, USER2, expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC); //update parent queue capacity diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 510ceb474f1..5bf8b05c48e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1235,9 +1235,9 @@ public class TestLeafQueue { qb.finishApplication(app_0.getApplicationId(), user_0); qb.finishApplication(app_2.getApplicationId(), user_1); qb.releaseResource(clusterResource, app_0, Resource.newInstance(4*GB, 1), - null, null); + "", null); qb.releaseResource(clusterResource, app_2, Resource.newInstance(4*GB, 1), - null, null); + "", null); qb.setUserLimit(50); qb.setUserLimitFactor(1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index 737db5b0d65..144f11f240a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -25,9 +25,11 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; @@ -47,11 +49,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestPartitionQueueMetrics; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -2032,10 +2037,9 @@ public class TestNodeLabelContainerAllocation { assertEquals(0 * GB, leafQueueB.getMetrics().getAvailableMB()); assertEquals(0 * GB, leafQueueB.getMetrics().getAllocatedMB()); - // The total memory tracked by QueueMetrics is 0GB for the default partition CSQueue rootQueue = cs.getRootQueue(); - assertEquals(0*GB, rootQueue.getMetrics().getAvailableMB() + - rootQueue.getMetrics().getAllocatedMB()); + assertEquals(0 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); // Kill all apps in queue a cs.killAllAppsInQueue("a"); @@ -2084,6 +2088,8 @@ public class TestNodeLabelContainerAllocation { csConf.setCapacityByLabel(queueB, "x", 50); csConf.setMaximumCapacityByLabel(queueB, "x", 50); + csConf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true); + // set node -> label mgr.addToCluserNodeLabels( ImmutableSet.of(NodeLabel.newInstance("x", false))); @@ -2102,6 +2108,54 @@ public class TestNodeLabelContainerAllocation { rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + double delta = 0.0001; + CSQueue leafQueue = cs.getQueue("a"); + CSQueue leafQueueB = cs.getQueue("b"); + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta); + assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta); + + MetricsSystem ms = leafQueueB.getMetrics().getMetricsSystem(); + QueueMetrics partXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x"); + QueueMetrics partDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, ""); + QueueMetrics queueAMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a"); + QueueMetrics queueBMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.b"); + QueueMetrics queueAPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a"); + QueueMetrics queueAPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a"); + QueueMetrics queueBPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.b"); + QueueMetrics queueBPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.b"); + QueueMetrics rootMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root"); + assertEquals(10 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(10 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta); + assertEquals(10 * GB, rootMetrics.getAvailableMB(), delta); + assertEquals(2.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta); + // app1 -> a RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2); @@ -2109,47 +2163,73 @@ public class TestNodeLabelContainerAllocation { // app1 asks for 3 partition= containers am1.allocate("*", 1 * GB, 3, new ArrayList()); - // NM1 do 50 heartbeats - CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); - - SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); for (int i = 0; i < 50; i++) { cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); } + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } // app1 gets all resource in partition=x (non-exclusive) Assert.assertEquals(3, schedulerNode1.getNumContainers()); - SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() .getNodeReport(nm1.getNodeId()); Assert.assertEquals(3 * GB, reportNm1.getUsedResource().getMemorySize()); Assert.assertEquals(7 * GB, reportNm1.getAvailableResource().getMemorySize()); - SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() .getNodeReport(nm2.getNodeId()); Assert.assertEquals(1 * GB, reportNm2.getUsedResource().getMemorySize()); Assert.assertEquals(9 * GB, reportNm2.getAvailableResource().getMemorySize()); - - LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); - // 3GB is used from label x quota. 1.5 GB is remaining from default label. - // 2GB is remaining from label x. - assertEquals(15 * GB / 10, leafQueue.getMetrics().getAvailableMB()); + assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(9 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(7.5 * GB, queueBPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(5 * GB, queueBPartXMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAMetrics.getAllocatedMB(), delta); + assertEquals(1.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(0 * GB, queueBMetrics.getAllocatedMB(), delta); + assertEquals(7.5 * GB, queueBMetrics.getAvailableMB(), delta); + assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueBPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueBPartXMetrics.getPendingMB(), delta); + assertEquals(1.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, partDefaultMetrics.getAllocatedMB(), delta); + + QueueMetrics partDefaultQueueAUserMetrics = + (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "", "user", + "root.a"); + QueueMetrics partXQueueAUserMetrics = + (QueueMetrics) TestPartitionQueueMetrics.userSource(ms, "x", "user", + "root.a"); + QueueMetrics queueAUserMetrics = + (QueueMetrics) TestQueueMetrics.userSource(ms, "root.a", "user"); + + assertEquals(2 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAUserMetrics.getAllocatedMB(), delta); + assertEquals(1.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); - // app1 asks for 1 default partition container am1.allocate("*", 1 * GB, 5, new ArrayList()); - // NM2 do couple of heartbeats - RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); - - SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); - // app1 gets all resource in default partition Assert.assertEquals(2, schedulerNode2.getNumContainers()); + Assert.assertEquals(3, schedulerNode1.getNumContainers()); // 3GB is used from label x quota. 2GB used from default label. // So 0.5 GB is remaining from default label. @@ -2158,10 +2238,100 @@ public class TestNodeLabelContainerAllocation { // The total memory tracked by QueueMetrics is 10GB // for the default partition - CSQueue rootQueue = cs.getRootQueue(); assertEquals(10*GB, rootQueue.getMetrics().getAvailableMB() + rootQueue.getMetrics().getAllocatedMB()); + assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAMetrics.getAllocatedMB()); + assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), + delta); + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(2 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta); + assertEquals(7 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partXMetrics.getAllocatedMB(), delta); + assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta); + + // Pending Resources when containers are waiting on "default" partition + assertEquals(4 * GB, queueAMetrics.getPendingMB(), delta); + assertEquals(4 * GB, queueAPartDefaultMetrics.getPendingMB(), delta); + assertEquals(4 * GB, partDefaultQueueAUserMetrics.getPendingMB(), + delta); + assertEquals(4 * GB, queueAUserMetrics.getPendingMB(), delta); + assertEquals(4 * GB, partDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partXMetrics.getPendingMB(), delta); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + assertEquals(0.5 * GB, queueAMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAMetrics.getAllocatedMB()); + assertEquals(0.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAPartDefaultMetrics.getAllocatedMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(7 * GB, queueAPartXMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), + delta); + assertEquals(2 * GB, partDefaultQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(0 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(7 * GB, partXQueueAUserMetrics.getAllocatedMB(), delta); + assertEquals(1 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, queueAUserMetrics.getAllocatedMB(), delta); + assertEquals(3 * GB, partXMetrics.getAvailableMB(), delta); + assertEquals(7 * GB, partXMetrics.getAllocatedMB(), delta); + assertEquals(8 * GB, partDefaultMetrics.getAvailableMB(), delta); + assertEquals(2 * GB, partDefaultMetrics.getAllocatedMB(), delta); + + // Pending Resources after containers has been assigned on "x" partition + assertEquals(0 * GB, queueAMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partDefaultQueueAUserMetrics.getPendingMB(), + delta); + assertEquals(0 * GB, queueAUserMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partDefaultMetrics.getPendingMB(), delta); + assertEquals(0 * GB, queueAPartXMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partXQueueAUserMetrics.getPendingMB(), delta); + assertEquals(0 * GB, partXMetrics.getPendingMB(), delta); + + rm1.killApp(app1.getApplicationId()); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + assertEquals(10 * GB, rootQueue.getMetrics().getAvailableMB(), delta); + assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(7.5 * GB, leafQueueB.getMetrics().getAvailableMB(), delta); + assertEquals(2, queueAMetrics.getAggregateAllocatedContainers()); + assertEquals(2, queueAMetrics.getAggegatedReleasedContainers()); + assertEquals(2, queueAPartDefaultMetrics.getAggregateAllocatedContainers()); + assertEquals(2, queueAPartDefaultMetrics.getAggegatedReleasedContainers()); + assertEquals(7, partXMetrics.getAggregateAllocatedContainers()); + assertEquals(2, partDefaultMetrics.getAggregateAllocatedContainers()); + assertEquals(7, queueAPartXMetrics.getAggregateAllocatedContainers()); + assertEquals(7, queueAPartXMetrics.getAggegatedReleasedContainers()); + assertEquals(2.5 * GB, queueAPartDefaultMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, queueAPartXMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, queueAUserMetrics.getAvailableMB(), delta); + assertEquals(3 * GB, partDefaultQueueAUserMetrics.getAvailableMB(), delta); + assertEquals(5 * GB, partXQueueAUserMetrics.getAvailableMB(), delta); rm1.close(); } @@ -2278,8 +2448,8 @@ public class TestNodeLabelContainerAllocation { // The total memory tracked by QueueMetrics is 12GB // for the default partition CSQueue rootQueue = cs.getRootQueue(); - assertEquals(12*GB, rootQueue.getMetrics().getAvailableMB() + - rootQueue.getMetrics().getAllocatedMB()); + assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); // Kill all apps in queue a cs.killAllAppsInQueue("a"); @@ -2291,6 +2461,193 @@ public class TestNodeLabelContainerAllocation { rm1.close(); } + @Test + public void testTwoLevelQueueMetricsWithLabels() throws Exception { + + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration( + this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a"}); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String queueA = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(queueA, 100); + csConf.setAccessibleNodeLabels(queueA, toSet("x")); + csConf.setCapacityByLabel(queueA, "x", 100); + csConf.setMaximumCapacityByLabel(queueA, "x", 100); + + csConf.setQueues(queueA, new String[] {"a1"}); + final String queueA1 = queueA + ".a1"; + csConf.setCapacity(queueA1, 100); + + csConf.setAccessibleNodeLabels(queueA1, toSet("x")); + csConf.setCapacityByLabel(queueA1, "x", 100); + csConf.setMaximumCapacityByLabel(queueA1, "x", 100); + + // set node -> label + // label x exclusivity is set to true + mgr.addToCluserNodeLabels( + ImmutableSet.of(NodeLabel.newInstance("x", true))); + mgr.addLabelsToNode( + ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm1 = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm1.registerNode("h2:1234", 12 * GB); // label = + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + ParentQueue leafQueueA = (ParentQueue) cs.getQueue("a"); + LeafQueue leafQueueA1 = (LeafQueue) cs.getQueue("a1"); + assertEquals(12 * GB, leafQueueA1.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueueA1.getMetrics().getAllocatedMB()); + MetricsSystem ms = leafQueueA1.getMetrics().getMetricsSystem(); + QueueMetrics partXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, "x"); + QueueMetrics partDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.partitionSource(ms, ""); + QueueMetrics queueAPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a"); + QueueMetrics queueAPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a"); + QueueMetrics queueA1PartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root.a.a1"); + QueueMetrics queueA1PartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root.a.a1"); + QueueMetrics queueRootPartDefaultMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "", "root"); + QueueMetrics queueRootPartXMetrics = + (QueueMetrics) TestPartitionQueueMetrics.queueSource(ms, "x", "root"); + QueueMetrics queueAMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a"); + QueueMetrics queueA1Metrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root.a.a1"); + QueueMetrics queueRootMetrics = + (QueueMetrics) TestQueueMetrics.queueSource(ms, "root"); + assertEquals(12 * GB, queueAMetrics.getAvailableMB()); + assertEquals(12 * GB, queueA1Metrics.getAvailableMB()); + assertEquals(12 * GB, queueRootMetrics.getAvailableMB()); + assertEquals(12 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(10 * GB, queueA1PartXMetrics.getAvailableMB()); + assertEquals(10 * GB, queueAPartXMetrics.getAvailableMB()); + assertEquals(10 * GB, queueRootPartXMetrics.getAvailableMB()); + assertEquals(12 * GB, queueA1PartDefaultMetrics.getAvailableMB()); + assertEquals(12 * GB, queueAPartDefaultMetrics.getAvailableMB()); + assertEquals(12 * GB, queueRootPartDefaultMetrics.getAvailableMB()); + assertEquals(10 * GB, partXMetrics.getAvailableMB()); + assertEquals(12 * GB, partDefaultMetrics.getAvailableMB()); + + // app1 -> a + RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1", "x"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + // app1 asks for 5 partition=x containers + am1.allocate("*", 1 * GB, 5, new ArrayList(), "x"); + // NM1 do 50 heartbeats + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(6, schedulerNode1.getNumContainers()); + + SchedulerNodeReport reportNm1 = rm1.getResourceScheduler() + .getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, reportNm1.getAvailableResource().getMemorySize()); + + SchedulerNodeReport reportNm2 = rm1.getResourceScheduler() + .getNodeReport(nm2.getNodeId()); + Assert.assertEquals(0 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(12 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(0 * GB, queueAMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueA1Metrics.getAllocatedMB()); + assertEquals(0 * GB, queueRootMetrics.getAllocatedMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + assertEquals(0 * GB, leafQueueA.getMetrics().getAllocatedMB()); + assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueA1PartDefaultMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueAPartDefaultMetrics.getAllocatedMB()); + assertEquals(0 * GB, queueRootPartDefaultMetrics.getAllocatedMB()); + assertEquals(6 * GB, partXMetrics.getAllocatedMB()); + assertEquals(0 * GB, partDefaultMetrics.getAllocatedMB()); + + // app2 -> a + RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a1", ""); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2); + + // app2 asks for 5 partition= containers + am2.allocate("*", 1 * GB, 5, new ArrayList(), ""); + // NM2 do 50 heartbeats + RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); + + SchedulerNode schedulerNode2 = cs.getSchedulerNode(nm2.getNodeId()); + + for (int i = 0; i < 50; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode2)); + } + + // app1 gets all resource in partition=x + Assert.assertEquals(6, schedulerNode2.getNumContainers()); + + reportNm1 = rm1.getResourceScheduler().getNodeReport(nm1.getNodeId()); + Assert.assertEquals(6 * GB, reportNm1.getUsedResource().getMemorySize()); + Assert.assertEquals(4 * GB, + reportNm1.getAvailableResource().getMemorySize()); + + reportNm2 = rm1.getResourceScheduler().getNodeReport(nm2.getNodeId()); + Assert.assertEquals(6 * GB, reportNm2.getUsedResource().getMemorySize()); + Assert.assertEquals(6 * GB, + reportNm2.getAvailableResource().getMemorySize()); + + assertEquals(6 * GB, leafQueueA.getMetrics().getAvailableMB()); + assertEquals(6 * GB, leafQueueA.getMetrics().getAllocatedMB()); + + // The total memory tracked by QueueMetrics is 12GB + // for the default partition + CSQueue rootQueue = cs.getRootQueue(); + assertEquals(12 * GB, rootQueue.getMetrics().getAvailableMB() + + rootQueue.getMetrics().getAllocatedMB()); + + assertEquals(6 * GB, queueAMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueA1Metrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueA1PartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueAPartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootPartXMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueA1PartDefaultMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueAPartDefaultMetrics.getAllocatedMB()); + assertEquals(6 * GB, queueRootPartDefaultMetrics.getAllocatedMB()); + assertEquals(6 * GB, partXMetrics.getAllocatedMB()); + assertEquals(6 * GB, partDefaultMetrics.getAllocatedMB()); + + // Kill all apps in queue a + cs.killAllAppsInQueue("a1"); + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); + rm1.waitForAppRemovedFromScheduler(app1.getApplicationId()); + assertEquals(0 * GB, leafQueueA.getMetrics().getUsedAMResourceMB()); + assertEquals(0, leafQueueA.getMetrics().getUsedAMResourceVCores()); + rm1.close(); + } + @Test public void testQueueMetricsWithLabelsDisableElasticity() throws Exception { /**