From d573f09fb93dbb711d504620af5d73840ea063a6 Mon Sep 17 00:00:00 2001 From: Jian He Date: Fri, 17 Apr 2015 13:36:46 -0700 Subject: [PATCH] YARN-2696. Queue sorting in CapacityScheduler should consider node label. Contributed by Wangda Tan --- hadoop-yarn-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 6 +- .../nodelabels/RMNodeLabelsManager.java | 2 +- .../scheduler/AbstractYarnScheduler.java | 4 + .../scheduler/ResourceUsage.java | 10 + .../scheduler/capacity/AbstractCSQueue.java | 12 +- .../scheduler/capacity/CSQueueUtils.java | 129 +++-- .../scheduler/capacity/CapacityScheduler.java | 15 +- .../capacity/CapacitySchedulerContext.java | 4 +- .../scheduler/capacity/LeafQueue.java | 5 +- .../scheduler/capacity/ParentQueue.java | 46 +- .../capacity/PartitionedQueueComparator.java | 68 +++ .../scheduler/capacity/QueueCapacities.java | 11 +- .../scheduler/capacity/ReservationQueue.java | 7 +- .../capacity/TestApplicationLimits.java | 12 +- .../capacity/TestChildQueueOrder.java | 4 +- .../scheduler/capacity/TestLeafQueue.java | 4 +- .../TestNodeLabelContainerAllocation.java | 451 ++++++++++++++++++ .../scheduler/capacity/TestParentQueue.java | 4 +- .../scheduler/capacity/TestReservations.java | 4 +- .../scheduler/fifo/TestFifoScheduler.java | 4 + 21 files changed, 722 insertions(+), 83 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c4d7d18c960..3fbaf66d3fc 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -139,6 +139,9 @@ Release 2.8.0 - UNRELEASED YARN-3404. Display queue name on application page. (Ryu Kobayashi via jianhe) + YARN-2696. Queue sorting in CapacityScheduler should consider node label. + (Wangda Tan via jianhe) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml index 4b01a4d5448..8aae152f936 100644 --- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml +++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml @@ -141,10 +141,14 @@ - + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java index 574e24c3512..25e5bc0989a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/RMNodeLabelsManager.java @@ -254,7 +254,7 @@ public class RMNodeLabelsManager extends CommonNodeLabelsManager { } } - public void updateNodeResource(NodeId node, Resource newResource) throws IOException { + public void updateNodeResource(NodeId node, Resource newResource) { deactivateNode(node); activateNode(node, newResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 6699b055882..1a8c653cc58 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -548,6 +548,10 @@ public abstract class AbstractYarnScheduler Resource newResource = resourceOption.getResource(); Resource oldResource = node.getTotalResource(); if(!oldResource.equals(newResource)) { + // Notify NodeLabelsManager about this change + rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(), + newResource); + // Log resource change LOG.info("Update resource on node: " + node.getNodeName() + " from: " + oldResource + ", to: " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java index 2f7e19dd2cb..88e93c1689e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; @@ -372,4 +373,13 @@ public class ResourceUsage { readLock.unlock(); } } + + public Set getNodePartitionsSet() { + try { + readLock.lock(); + return usages.keySet(); + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index d95c45c79be..550c6aa2fdb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -271,8 +271,8 @@ public abstract class AbstractCSQueue implements CSQueue { this.acls = csContext.getConfiguration().getAcls(getQueuePath()); // Update metrics - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, parent, clusterResource, minimumAllocation); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); // Check if labels of this queue is a subset of parent queue, only do this // when we not root @@ -351,16 +351,16 @@ public abstract class AbstractCSQueue implements CSQueue { queueUsage.incUsed(nodePartition, resource); ++numContainers; - CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(), - clusterResource, minimumAllocation); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, nodePartition); } protected synchronized void releaseResource(Resource clusterResource, Resource resource, String nodePartition) { queueUsage.decUsed(nodePartition, resource); - CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(), - clusterResource, minimumAllocation); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, nodePartition); --numContainers; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 1921195c220..8f9362e1686 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -20,18 +20,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.HashSet; import java.util.Set; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; +import com.google.common.collect.Sets; + class CSQueueUtils { - - private static final Log LOG = LogFactory.getLog(CSQueueUtils.class); final static float EPSILON = 0.0001f; @@ -188,41 +187,103 @@ class CSQueueUtils { } } - @Lock(CSQueue.class) - public static void updateQueueStatistics( - final ResourceCalculator calculator, - final CSQueue childQueue, final CSQueue parentQueue, - final Resource clusterResource, final Resource minimumAllocation) { - Resource queueLimit = Resources.none(); - Resource usedResources = childQueue.getUsedResources(); - + /** + * Update partitioned resource usage, if nodePartition == null, will update + * used resource for all partitions of this queue. + */ + private static void updateUsedCapacity(final ResourceCalculator rc, + final Resource totalPartitionResource, final Resource minimumAllocation, + ResourceUsage queueResourceUsage, QueueCapacities queueCapacities, + String nodePartition) { float absoluteUsedCapacity = 0.0f; float usedCapacity = 0.0f; - if (Resources.greaterThan( - calculator, clusterResource, clusterResource, Resources.none())) { - queueLimit = - Resources.multiply(clusterResource, childQueue.getAbsoluteCapacity()); - absoluteUsedCapacity = - Resources.divide(calculator, clusterResource, - usedResources, clusterResource); - usedCapacity = - Resources.equals(queueLimit, Resources.none()) ? 0 : - Resources.divide(calculator, clusterResource, - usedResources, queueLimit); + if (Resources.greaterThan(rc, totalPartitionResource, + totalPartitionResource, Resources.none())) { + // queueGuaranteed = totalPartitionedResource * + // absolute_capacity(partition) + Resource queueGuranteedResource = + Resources.multiply(totalPartitionResource, + queueCapacities.getAbsoluteCapacity(nodePartition)); + + // make queueGuranteed >= minimum_allocation to avoid divided by 0. + queueGuranteedResource = + Resources.max(rc, totalPartitionResource, queueGuranteedResource, + minimumAllocation); + + Resource usedResource = queueResourceUsage.getUsed(nodePartition); + absoluteUsedCapacity = + Resources.divide(rc, totalPartitionResource, usedResource, + totalPartitionResource); + usedCapacity = + Resources.divide(rc, totalPartitionResource, usedResource, + queueGuranteedResource); + } + + queueCapacities + .setAbsoluteUsedCapacity(nodePartition, absoluteUsedCapacity); + queueCapacities.setUsedCapacity(nodePartition, usedCapacity); + } + + private static Resource getNonPartitionedMaxAvailableResourceToQueue( + final ResourceCalculator rc, Resource totalNonPartitionedResource, + CSQueue queue) { + Resource queueLimit = Resources.none(); + Resource usedResources = queue.getUsedResources(); + + if (Resources.greaterThan(rc, totalNonPartitionedResource, + totalNonPartitionedResource, Resources.none())) { + queueLimit = + Resources.multiply(totalNonPartitionedResource, + queue.getAbsoluteCapacity()); } - childQueue.setUsedCapacity(usedCapacity); - childQueue.setAbsoluteUsedCapacity(absoluteUsedCapacity); - Resource available = Resources.subtract(queueLimit, usedResources); - childQueue.getMetrics().setAvailableResourcesToQueue( - Resources.max( - calculator, - clusterResource, - available, - Resources.none() - ) - ); + return Resources.max(rc, totalNonPartitionedResource, available, + Resources.none()); + } + + /** + *

+ * Update Queue Statistics: + *

+ * + *
  • used-capacity/absolute-used-capacity by partition
  • + *
  • non-partitioned max-avail-resource to queue
  • + * + *

    + * When nodePartition is null, all partition of + * used-capacity/absolute-used-capacity will be updated. + *

    + */ + @Lock(CSQueue.class) + public static void updateQueueStatistics( + final ResourceCalculator rc, final Resource cluster, final Resource minimumAllocation, + final CSQueue childQueue, final RMNodeLabelsManager nlm, + final String nodePartition) { + QueueCapacities queueCapacities = childQueue.getQueueCapacities(); + ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage(); + + if (nodePartition == null) { + for (String partition : Sets.union( + queueCapacities.getNodePartitionsSet(), + queueResourceUsage.getNodePartitionsSet())) { + updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster), + minimumAllocation, queueResourceUsage, queueCapacities, partition); + } + } else { + updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster), + minimumAllocation, queueResourceUsage, queueCapacities, nodePartition); + } + + // Now in QueueMetrics, we only store available-resource-to-queue for + // default partition. + if (nodePartition == null + || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) { + childQueue.getMetrics().setAvailableResourcesToQueue( + getNonPartitionedMaxAvailableResourceToQueue(rc, + nlm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, cluster), + childQueue)); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cfeee37d1e6..5d58b153953 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -136,7 +136,8 @@ public class CapacityScheduler extends // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; - static final Comparator queueComparator = new Comparator() { + static final Comparator nonPartitionedQueueComparator = + new Comparator() { @Override public int compare(CSQueue q1, CSQueue q2) { if (q1.getUsedCapacity() < q2.getUsedCapacity()) { @@ -148,6 +149,9 @@ public class CapacityScheduler extends return q1.getQueuePath().compareTo(q2.getQueuePath()); } }; + + static final PartitionedQueueComparator partitionedQueueComparator = + new PartitionedQueueComparator(); static final Comparator applicationComparator = new Comparator() { @@ -274,8 +278,13 @@ public class CapacityScheduler extends } @Override - public Comparator getQueueComparator() { - return queueComparator; + public Comparator getNonPartitionedQueueComparator() { + return nonPartitionedQueueComparator; + } + + @Override + public PartitionedQueueComparator getPartitionedQueueComparator() { + return partitionedQueueComparator; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java index 28dc98859d5..707c4638638 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java @@ -58,7 +58,9 @@ public interface CapacitySchedulerContext { ResourceCalculator getResourceCalculator(); - Comparator getQueueComparator(); + Comparator getNonPartitionedQueueComparator(); + + PartitionedQueueComparator getPartitionedQueueComparator(); FiCaSchedulerNode getNode(NodeId nodeId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 8a6a601f202..f860574a816 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1814,9 +1814,8 @@ public class LeafQueue extends AbstractCSQueue { setQueueResourceLimitsInfo(clusterResource); // Update metrics - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, getParent(), clusterResource, - minimumAllocation); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); // queue metrics are updated, more resource may be available // activate the pending applications if possible diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index eb64d4384f0..53142b53231 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; @@ -68,7 +69,8 @@ public class ParentQueue extends AbstractCSQueue { protected final Set childQueues; private final boolean rootQueue; - final Comparator queueComparator; + final Comparator nonPartitionedQueueComparator; + final PartitionedQueueComparator partitionQueueComparator; volatile int numApplications; private final CapacitySchedulerContext scheduler; @@ -79,7 +81,8 @@ public class ParentQueue extends AbstractCSQueue { String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); this.scheduler = cs; - this.queueComparator = cs.getQueueComparator(); + this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator(); + this.partitionQueueComparator = cs.getPartitionedQueueComparator(); this.rootQueue = (parent == null); @@ -92,7 +95,7 @@ public class ParentQueue extends AbstractCSQueue { ". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE); } - this.childQueues = new TreeSet(queueComparator); + this.childQueues = new TreeSet(nonPartitionedQueueComparator); setupQueueConfigs(cs.getClusterResource()); @@ -522,6 +525,17 @@ public class ParentQueue extends AbstractCSQueue { return new ResourceLimits(childLimit); } + private Iterator sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) { + if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) { + return childQueues.iterator(); + } + + partitionQueueComparator.setPartitionToLookAt(node.getPartition()); + List childrenList = new ArrayList<>(childQueues); + Collections.sort(childrenList, partitionQueueComparator); + return childrenList.iterator(); + } + private synchronized CSAssignment assignContainersToChildQueues( Resource cluster, FiCaSchedulerNode node, ResourceLimits limits, SchedulingMode schedulingMode) { @@ -531,7 +545,8 @@ public class ParentQueue extends AbstractCSQueue { printChildQueues(); // Try to assign to most 'under-served' sub-queue - for (Iterator iter = childQueues.iterator(); iter.hasNext();) { + for (Iterator iter = sortAndGetChildrenAllocationIterator(node); iter + .hasNext();) { CSQueue childQueue = iter.next(); if(LOG.isDebugEnabled()) { LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath() @@ -554,13 +569,17 @@ public class ParentQueue extends AbstractCSQueue { if (Resources.greaterThan( resourceCalculator, cluster, assignment.getResource(), Resources.none())) { - // Remove and re-insert to sort - iter.remove(); - LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() + - " stats: " + childQueue); - childQueues.add(childQueue); - if (LOG.isDebugEnabled()) { - printChildQueues(); + // Only update childQueues when we doing non-partitioned node + // allocation. + if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) { + // Remove and re-insert to sort + iter.remove(); + LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() + + " stats: " + childQueue); + childQueues.add(childQueue); + if (LOG.isDebugEnabled()) { + printChildQueues(); + } } break; } @@ -647,9 +666,8 @@ public class ParentQueue extends AbstractCSQueue { childQueue.updateClusterResource(clusterResource, childLimits); } - // Update metrics - CSQueueUtils.updateQueueStatistics( - resourceCalculator, this, parent, clusterResource, minimumAllocation); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java new file mode 100644 index 00000000000..ddcc719ce00 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PartitionedQueueComparator.java @@ -0,0 +1,68 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.Comparator; + +public class PartitionedQueueComparator implements Comparator { + private String partitionToLookAt = null; + + public void setPartitionToLookAt(String partitionToLookAt) { + this.partitionToLookAt = partitionToLookAt; + } + + + @Override + public int compare(CSQueue q1, CSQueue q2) { + /* + * 1. Check accessible to given partition, if one queue accessible and + * the other not, accessible queue goes first. + */ + boolean q1Accessible = + q1.getAccessibleNodeLabels().contains(partitionToLookAt); + boolean q2Accessible = + q2.getAccessibleNodeLabels().contains(partitionToLookAt); + if (q1Accessible && !q2Accessible) { + return -1; + } else if (!q1Accessible && q2Accessible) { + return 1; + } + + /* + * + * 2. When two queue has same accessibility, check who will go first: + * Now we simply compare their used resource on the partition to lookAt + */ + float used1 = q1.getQueueCapacities().getUsedCapacity(partitionToLookAt); + float used2 = q2.getQueueCapacities().getUsedCapacity(partitionToLookAt); + if (Math.abs(used1 - used2) < 1e-6) { + // When used capacity is same, compare their guaranteed-capacity + float cap1 = q1.getQueueCapacities().getCapacity(partitionToLookAt); + float cap2 = q2.getQueueCapacities().getCapacity(partitionToLookAt); + + // when cap1 == cap2, we will compare queue's name + if (Math.abs(cap1 - cap2) < 1e-6) { + return q1.getQueueName().compareTo(q2.getQueueName()); + } + return Float.compare(cap2, cap1); + } + + return Float.compare(used1, used2); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java index 962a636eb02..d0a26d6b209 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java @@ -30,8 +30,6 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import com.google.common.collect.Sets; - public class QueueCapacities { private static final String NL = CommonNodeLabelsManager.NO_LABEL; private static final float LABEL_DOESNT_EXIST_CAP = 0f; @@ -254,4 +252,13 @@ public class QueueCapacities { readLock.unlock(); } } + + public Set getNodePartitionsSet() { + try { + readLock.lock(); + return capacitiesMap.keySet(); + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java index a8d17cfd0d1..4790cc7e942 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java @@ -63,10 +63,9 @@ public class ReservationQueue extends LeafQueue { + " from " + newlyParsedQueue.getQueuePath()); } super.reinitialize(newlyParsedQueue, clusterResource); - CSQueueUtils.updateQueueStatistics( - parent.schedulerContext.getResourceCalculator(), newlyParsedQueue, - parent, parent.schedulerContext.getClusterResource(), - parent.schedulerContext.getMinimumResourceCapability()); + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + minimumAllocation, this, labelManager, null); + updateQuotas(parent.getUserLimitForReservation(), parent.getUserLimitFactor(), parent.getMaxApplicationsForReservations(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index 46167ca6859..a41fdfa5aba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -95,8 +95,8 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()). - thenReturn(CapacityScheduler.queueComparator); + when(csContext.getNonPartitionedQueueComparator()). + thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); @@ -255,8 +255,8 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(16*GB, 16)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()). - thenReturn(CapacityScheduler.queueComparator); + when(csContext.getNonPartitionedQueueComparator()). + thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); @@ -554,8 +554,8 @@ public class TestApplicationLimits { thenReturn(Resources.createResource(16*GB)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()). - thenReturn(CapacityScheduler.queueComparator); + when(csContext.getNonPartitionedQueueComparator()). + thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java index 970a98ad576..2608dcb5d35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java @@ -96,8 +96,8 @@ public class TestChildQueueOrder { thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()). - thenReturn(CapacityScheduler.queueComparator); + when(csContext.getNonPartitionedQueueComparator()). + thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 0b5250b4fae..0a196041bd9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -152,8 +152,8 @@ public class TestLeafQueue { thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()). - thenReturn(CapacityScheduler.queueComparator); + when(csContext.getNonPartitionedQueueComparator()). + thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index cf1b26f37e9..5155db53631 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -1024,4 +1024,455 @@ public class TestNodeLabelContainerAllocation { rm1.close(); } + + private void checkQueueUsedCapacity(String queueName, CapacityScheduler cs, + String nodePartition, float usedCapacity, float absoluteUsedCapacity) { + float epsilon = 1e-6f; + CSQueue queue = cs.getQueue(queueName); + Assert.assertNotNull("Failed to get queue=" + queueName, queue); + + Assert.assertEquals(usedCapacity, queue.getQueueCapacities() + .getUsedCapacity(nodePartition), epsilon); + Assert.assertEquals(absoluteUsedCapacity, queue.getQueueCapacities() + .getAbsoluteUsedCapacity(nodePartition), epsilon); + } + + private void doNMHeartbeat(MockRM rm, NodeId nodeId, int nHeartbeat) { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nodeId); + for (int i = 0; i < nHeartbeat; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + } + } + + private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum) + throws InterruptedException { + int totalWaitTick = 100; // wait 10 sec at most. + while (expectedNodeNum > rm.getResourceScheduler().getNumClusterNodes() + && totalWaitTick > 0) { + Thread.sleep(100); + totalWaitTick--; + } + } + + @Test + public void testQueueUsedCapacitiesUpdate() + throws Exception { + /** + * Test case: have a following queue structure: + * + *
    +     *            root
    +     *         /      \
    +     *        a        b
    +     *       / \      (x)
    +     *      a1  a2
    +     *     (x)  (x)
    +     * 
    + * + * Both a/b can access x, we need to verify when + *
    +     * 1) container allocated/released in both partitioned/non-partitioned node, 
    +     * 2) clusterResource updates
    +     * 3) queue guaranteed resource changed
    +     * 
    + * + * used capacity / absolute used capacity of queues are correctly updated. + */ + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a", + "b" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + /** + * Initially, we set A/B's resource 50:50 + */ + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(A, 50); + csConf.setAccessibleNodeLabels(A, toSet("x")); + csConf.setCapacityByLabel(A, "x", 50); + + csConf.setQueues(A, new String[] { "a1", "a2" }); + + final String A1 = A + ".a1"; + csConf.setCapacity(A1, 50); + csConf.setAccessibleNodeLabels(A1, toSet("x")); + csConf.setCapacityByLabel(A1, "x", 50); + + final String A2 = A + ".a2"; + csConf.setCapacity(A2, 50); + csConf.setAccessibleNodeLabels(A2, toSet("x")); + csConf.setCapacityByLabel(A2, "x", 50); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(B, 50); + csConf.setAccessibleNodeLabels(B, toSet("x")); + csConf.setCapacityByLabel(B, "x", 50); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x")); + // Makes x to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + /* + * Before we adding any node to the cluster, used-capacity/abs-used-capacity + * should be 0 + */ + checkQueueUsedCapacity("a", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a", cs, "", 0f, 0f); + checkQueueUsedCapacity("a1", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a1", cs, "", 0f, 0f); + checkQueueUsedCapacity("a2", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a2", cs, "", 0f, 0f); + checkQueueUsedCapacity("b", cs, "x", 0f, 0f); + checkQueueUsedCapacity("b", cs, "", 0f, 0f); + checkQueueUsedCapacity("root", cs, "x", 0f, 0f); + checkQueueUsedCapacity("root", cs, "", 0f, 0f); + + MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = + + /* + * After we adding nodes to the cluster, and before starting to use them, + * used-capacity/abs-used-capacity should be 0 + */ + checkQueueUsedCapacity("a", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a", cs, "", 0f, 0f); + checkQueueUsedCapacity("a1", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a1", cs, "", 0f, 0f); + checkQueueUsedCapacity("a2", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a2", cs, "", 0f, 0f); + checkQueueUsedCapacity("b", cs, "x", 0f, 0f); + checkQueueUsedCapacity("b", cs, "", 0f, 0f); + checkQueueUsedCapacity("root", cs, "x", 0f, 0f); + checkQueueUsedCapacity("root", cs, "", 0f, 0f); + + // app1 -> a1 + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + // app1 asks for 1 partition= containers + am1.allocate("*", 1 * GB, 1, new ArrayList()); + + doNMHeartbeat(rm, nm2.getNodeId(), 10); + + // Now check usage, app1 uses: + // a1: used(no-label) = 80% + // abs-used(no-label) = 20% + // a: used(no-label) = 40% + // abs-used(no-label) = 20% + // root: used(no-label) = 20% + // abs-used(no-label) = 20% + checkQueueUsedCapacity("a", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f); + checkQueueUsedCapacity("a1", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f); + checkQueueUsedCapacity("a2", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a2", cs, "", 0f, 0f); + checkQueueUsedCapacity("b", cs, "x", 0f, 0f); + checkQueueUsedCapacity("b", cs, "", 0f, 0f); + checkQueueUsedCapacity("root", cs, "x", 0f, 0f); + checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f); + + // app1 asks for 2 partition=x containers + am1.allocate("*", 1 * GB, 2, new ArrayList(), "x"); + doNMHeartbeat(rm, nm1.getNodeId(), 10); + + // Now check usage, app1 uses: + // a1: used(x) = 80% + // abs-used(x) = 20% + // a: used(x) = 40% + // abs-used(x) = 20% + // root: used(x) = 20% + // abs-used(x) = 20% + checkQueueUsedCapacity("a", cs, "x", 0.4f, 0.2f); + checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f); + checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f); + checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f); + checkQueueUsedCapacity("a2", cs, "x", 0f, 0f); + checkQueueUsedCapacity("a2", cs, "", 0f, 0f); + checkQueueUsedCapacity("b", cs, "x", 0f, 0f); + checkQueueUsedCapacity("b", cs, "", 0f, 0f); + checkQueueUsedCapacity("root", cs, "x", 0.2f, 0.2f); + checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f); + + // submit an app to a2, uses 1 NON_PARTITIONED container and 1 PARTITIONED + // container + // app2 -> a2 + RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "a2"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + // app1 asks for 1 partition= containers + am2.allocate("*", 1 * GB, 1, new ArrayList(), "x"); + doNMHeartbeat(rm, nm1.getNodeId(), 10); + + // Now check usage, app1 uses: + // a2: used(x) = 40% + // abs-used(x) = 10% + // a: used(x) = 20% + // abs-used(x) = 10% + // root: used(x) = 10% + // abs-used(x) = 10% + checkQueueUsedCapacity("a", cs, "x", 0.6f, 0.3f); + checkQueueUsedCapacity("a", cs, "", 0.6f, 0.3f); + checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f); + checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f); + checkQueueUsedCapacity("a2", cs, "x", 0.4f, 0.1f); + checkQueueUsedCapacity("a2", cs, "", 0.4f, 0.1f); + checkQueueUsedCapacity("b", cs, "x", 0f, 0f); + checkQueueUsedCapacity("b", cs, "", 0f, 0f); + checkQueueUsedCapacity("root", cs, "x", 0.3f, 0.3f); + checkQueueUsedCapacity("root", cs, "", 0.3f, 0.3f); + + // Add nm3/nm4, double resource for both partitioned/non-partitioned + // resource, used capacity should be 1/2 of before + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h3", 0), toSet("x"))); + rm.registerNode("h3:1234", 10 * GB); // label = x + rm.registerNode("h4:1234", 10 * GB); // label = + + waitSchedulerNodeJoined(rm, 4); + + checkQueueUsedCapacity("a", cs, "x", 0.3f, 0.15f); + checkQueueUsedCapacity("a", cs, "", 0.3f, 0.15f); + checkQueueUsedCapacity("a1", cs, "x", 0.4f, 0.1f); + checkQueueUsedCapacity("a1", cs, "", 0.4f, 0.1f); + checkQueueUsedCapacity("a2", cs, "x", 0.2f, 0.05f); + checkQueueUsedCapacity("a2", cs, "", 0.2f, 0.05f); + checkQueueUsedCapacity("b", cs, "x", 0f, 0f); + checkQueueUsedCapacity("b", cs, "", 0f, 0f); + checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f); + checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f); + + // Reinitialize queue, makes A's capacity double, and B's capacity to be 0 + csConf.setCapacity(A, 100); // was 50 + csConf.setCapacityByLabel(A, "x", 100); // was 50 + csConf.setCapacity(B, 0); // was 50 + csConf.setCapacityByLabel(B, "x", 0); // was 50 + cs.reinitialize(csConf, rm.getRMContext()); + + checkQueueUsedCapacity("a", cs, "x", 0.15f, 0.15f); + checkQueueUsedCapacity("a", cs, "", 0.15f, 0.15f); + checkQueueUsedCapacity("a1", cs, "x", 0.2f, 0.1f); + checkQueueUsedCapacity("a1", cs, "", 0.2f, 0.1f); + checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f); + checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f); + checkQueueUsedCapacity("b", cs, "x", 0f, 0f); + checkQueueUsedCapacity("b", cs, "", 0f, 0f); + checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f); + checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f); + + // Release all task containers from a1, check usage + am1.allocate(null, Arrays.asList( + ContainerId.newContainerId(am1.getApplicationAttemptId(), 2), + ContainerId.newContainerId(am1.getApplicationAttemptId(), 3), + ContainerId.newContainerId(am1.getApplicationAttemptId(), 4))); + checkQueueUsedCapacity("a", cs, "x", 0.05f, 0.05f); + checkQueueUsedCapacity("a", cs, "", 0.10f, 0.10f); + checkQueueUsedCapacity("a1", cs, "x", 0.0f, 0.0f); + checkQueueUsedCapacity("a1", cs, "", 0.1f, 0.05f); + checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f); + checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f); + checkQueueUsedCapacity("b", cs, "x", 0f, 0f); + checkQueueUsedCapacity("b", cs, "", 0f, 0f); + checkQueueUsedCapacity("root", cs, "x", 0.05f, 0.05f); + checkQueueUsedCapacity("root", cs, "", 0.10f, 0.10f); + + rm.close(); + } + + @Test + public void testOrderOfAllocationOnPartitions() + throws Exception { + /** + * Test case: have a following queue structure: + * + *
    +     *                root
    +     *          ________________
    +     *         /     |     \    \
    +     *        a (x)  b (x)  c    d
    +     * 
    + * + * Both a/b can access x, we need to verify when + *
    +     * When doing allocation on partitioned nodes,
    +     *    - Queue has accessibility to the node will go first
    +     *    - When accessibility is same
    +     *      - Queue has less used_capacity on given partition will go first
    +     *      - When used_capacity is same
    +     *        - Queue has more abs_capacity will go first
    +     * 
    + * + * used capacity / absolute used capacity of queues are correctly updated. + */ + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(this.conf); + + // Define top-level queues + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a", + "b", "c", "d" }); + csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100); + + final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + csConf.setCapacity(A, 25); + csConf.setAccessibleNodeLabels(A, toSet("x")); + csConf.setCapacityByLabel(A, "x", 30); + + final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + csConf.setCapacity(B, 25); + csConf.setAccessibleNodeLabels(B, toSet("x")); + csConf.setCapacityByLabel(B, "x", 70); + + final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + csConf.setCapacity(C, 25); + + final String D = CapacitySchedulerConfiguration.ROOT + ".d"; + csConf.setCapacity(D, 25); + + // set node -> label + mgr.addToCluserNodeLabels(ImmutableSet.of("x")); + // Makes x to be non-exclusive node labels + mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false))); + mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"))); + + // inject node label manager + MockRM rm = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + + rm.getRMContext().setNodeLabelManager(mgr); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + + MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x + MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = + + // app1 -> a + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2); + + // app2 -> b + RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "b"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + // app3 -> c + RMApp app3 = rm.submitApp(1 * GB, "app", "user", null, "c"); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm2); + + // app4 -> d + RMApp app4 = rm.submitApp(1 * GB, "app", "user", null, "d"); + MockAM am4 = MockRM.launchAndRegisterAM(app4, rm, nm2); + + // Test case 1 + // Both a/b has used_capacity(x) = 0, when doing exclusive allocation, b + // will go first since b has more capacity(x) + am1.allocate("*", 1 * GB, 1, new ArrayList(), "x"); + am2.allocate("*", 1 * GB, 1, new ArrayList(), "x"); + doNMHeartbeat(rm, nm1.getNodeId(), 1); + checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(), + cs.getApplicationAttempt(am2.getApplicationAttemptId())); + + // Test case 2 + // Do another allocation, a will go first since it has 0 use_capacity(x) and + // b has 1/7 used_capacity(x) + am2.allocate("*", 1 * GB, 1, new ArrayList(), "x"); + doNMHeartbeat(rm, nm1.getNodeId(), 1); + checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + + // Test case 3 + // Just like above, when doing non-exclusive allocation, b will go first as well. + am1.allocate("*", 1 * GB, 1, new ArrayList(), ""); + am2.allocate("*", 1 * GB, 1, new ArrayList(), ""); + doNMHeartbeat(rm, nm1.getNodeId(), 2); + checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), + cs.getApplicationAttempt(am2.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + + // Test case 4 + // After b allocated, we should be able to allocate non-exlusive container in a + doNMHeartbeat(rm, nm1.getNodeId(), 2); + checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), + cs.getApplicationAttempt(am2.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + + // Test case 5 + // b/c/d asks non-exclusive container together, b will go first irrelated to + // used_capacity(x) + am2.allocate("*", 1 * GB, 1, new ArrayList(), ""); + am3.allocate("*", 1 * GB, 2, new ArrayList(), ""); + am4.allocate("*", 1 * GB, 2, new ArrayList(), ""); + doNMHeartbeat(rm, nm1.getNodeId(), 2); + checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(), + cs.getApplicationAttempt(am2.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), + cs.getApplicationAttempt(am3.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), + cs.getApplicationAttempt(am4.getApplicationAttemptId())); + + // Test case 6 + // After b allocated, c will go first by lexicographic order + doNMHeartbeat(rm, nm1.getNodeId(), 1); + checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(), + cs.getApplicationAttempt(am2.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(), + cs.getApplicationAttempt(am3.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), + cs.getApplicationAttempt(am4.getApplicationAttemptId())); + + // Test case 7 + // After c allocated, d will go first because it has less used_capacity(x) + // than c + doNMHeartbeat(rm, nm1.getNodeId(), 2); + checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(), + cs.getApplicationAttempt(am2.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(), + cs.getApplicationAttempt(am3.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(), + cs.getApplicationAttempt(am4.getApplicationAttemptId())); + + // Test case 8 + // After d allocated, c will go first, c/d has same use_capacity(x), so compare c/d's lexicographic order + doNMHeartbeat(rm, nm1.getNodeId(), 1); + checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), + cs.getApplicationAttempt(am1.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(), + cs.getApplicationAttempt(am2.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(), + cs.getApplicationAttempt(am3.getApplicationAttemptId())); + checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(), + cs.getApplicationAttempt(am4.getApplicationAttemptId())); + + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 52d0bc1241b..bdbd1687f35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -92,8 +92,8 @@ public class TestParentQueue { thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32)); when(csContext.getApplicationComparator()). thenReturn(CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()). - thenReturn(CapacityScheduler.queueComparator); + when(csContext.getNonPartitionedQueueComparator()). + thenReturn(CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()). thenReturn(resourceComparator); when(csContext.getRMContext()).thenReturn(rmContext); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index 47be6180988..fc546ee30ca 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -122,8 +122,8 @@ public class TestReservations { Resources.createResource(100 * 16 * GB, 100 * 12)); when(csContext.getApplicationComparator()).thenReturn( CapacityScheduler.applicationComparator); - when(csContext.getQueueComparator()).thenReturn( - CapacityScheduler.queueComparator); + when(csContext.getNonPartitionedQueueComparator()).thenReturn( + CapacityScheduler.nonPartitionedQueueComparator); when(csContext.getResourceCalculator()).thenReturn(resourceCalculator); when(csContext.getRMContext()).thenReturn(rmContext); RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java index 77eebdf7466..e4583d1d49c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.Task; import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; @@ -301,6 +302,9 @@ public class TestFifoScheduler { scheduler); ((RMContextImpl) rmContext).setSystemMetricsPublisher( mock(SystemMetricsPublisher.class)); + NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager(); + nlm.init(new Configuration()); + rmContext.setNodeLabelManager(nlm); scheduler.setRMContext(rmContext); scheduler.init(conf);