YARN-2696. Queue sorting in CapacityScheduler should consider node label. Contributed by Wangda Tan

This commit is contained in:
Jian He 2015-04-17 13:36:46 -07:00
parent 1db355a875
commit d573f09fb9
21 changed files with 722 additions and 83 deletions

View File

@ -139,6 +139,9 @@ Release 2.8.0 - UNRELEASED
YARN-3404. Display queue name on application page. (Ryu Kobayashi via jianhe)
YARN-2696. Queue sorting in CapacityScheduler should consider node label.
(Wangda Tan via jianhe)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -141,10 +141,14 @@
<Class name="org.apache.hadoop.yarn.server.resourcemanager.resource.Priority$Comparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<Match>
<Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PartitionedQueueComparator" />
<Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
</Match>
<!-- Ignore some irrelevant class name warning -->
<Match>
<Class name="org.apache.hadoop.yarn.api.records.SerializedException" />

View File

@ -254,7 +254,7 @@ public void deactivateNode(NodeId nodeId) {
}
}
public void updateNodeResource(NodeId node, Resource newResource) throws IOException {
public void updateNodeResource(NodeId node, Resource newResource) {
deactivateNode(node);
activateNode(node, newResource);
}

View File

@ -548,6 +548,10 @@ public synchronized void updateNodeResource(RMNode nm,
Resource newResource = resourceOption.getResource();
Resource oldResource = node.getTotalResource();
if(!oldResource.equals(newResource)) {
// Notify NodeLabelsManager about this change
rmContext.getNodeLabelManager().updateNodeResource(nm.getNodeID(),
newResource);
// Log resource change
LOG.info("Update resource on node: " + node.getNodeName()
+ " from: " + oldResource + ", to: "

View File

@ -21,6 +21,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@ -372,4 +373,13 @@ public String toString() {
readLock.unlock();
}
}
public Set<String> getNodePartitionsSet() {
try {
readLock.lock();
return usages.keySet();
} finally {
readLock.unlock();
}
}
}

View File

@ -271,8 +271,8 @@ synchronized void setupQueueConfigs(Resource clusterResource)
this.acls = csContext.getConfiguration().getAcls(getQueuePath());
// Update metrics
CSQueueUtils.updateQueueStatistics(
resourceCalculator, this, parent, clusterResource, minimumAllocation);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, null);
// Check if labels of this queue is a subset of parent queue, only do this
// when we not root
@ -351,16 +351,16 @@ synchronized void allocateResource(Resource clusterResource,
queueUsage.incUsed(nodePartition, resource);
++numContainers;
CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
clusterResource, minimumAllocation);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, nodePartition);
}
protected synchronized void releaseResource(Resource clusterResource,
Resource resource, String nodePartition) {
queueUsage.decUsed(nodePartition, resource);
CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
clusterResource, minimumAllocation);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, nodePartition);
--numContainers;
}

View File

@ -20,18 +20,17 @@
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.Sets;
class CSQueueUtils {
private static final Log LOG = LogFactory.getLog(CSQueueUtils.class);
final static float EPSILON = 0.0001f;
@ -188,41 +187,103 @@ private static void updateAbsoluteCapacitiesByNodeLabels(
}
}
@Lock(CSQueue.class)
public static void updateQueueStatistics(
final ResourceCalculator calculator,
final CSQueue childQueue, final CSQueue parentQueue,
final Resource clusterResource, final Resource minimumAllocation) {
Resource queueLimit = Resources.none();
Resource usedResources = childQueue.getUsedResources();
/**
* Update partitioned resource usage, if nodePartition == null, will update
* used resource for all partitions of this queue.
*/
private static void updateUsedCapacity(final ResourceCalculator rc,
final Resource totalPartitionResource, final Resource minimumAllocation,
ResourceUsage queueResourceUsage, QueueCapacities queueCapacities,
String nodePartition) {
float absoluteUsedCapacity = 0.0f;
float usedCapacity = 0.0f;
if (Resources.greaterThan(
calculator, clusterResource, clusterResource, Resources.none())) {
queueLimit =
Resources.multiply(clusterResource, childQueue.getAbsoluteCapacity());
absoluteUsedCapacity =
Resources.divide(calculator, clusterResource,
usedResources, clusterResource);
usedCapacity =
Resources.equals(queueLimit, Resources.none()) ? 0 :
Resources.divide(calculator, clusterResource,
usedResources, queueLimit);
if (Resources.greaterThan(rc, totalPartitionResource,
totalPartitionResource, Resources.none())) {
// queueGuaranteed = totalPartitionedResource *
// absolute_capacity(partition)
Resource queueGuranteedResource =
Resources.multiply(totalPartitionResource,
queueCapacities.getAbsoluteCapacity(nodePartition));
// make queueGuranteed >= minimum_allocation to avoid divided by 0.
queueGuranteedResource =
Resources.max(rc, totalPartitionResource, queueGuranteedResource,
minimumAllocation);
Resource usedResource = queueResourceUsage.getUsed(nodePartition);
absoluteUsedCapacity =
Resources.divide(rc, totalPartitionResource, usedResource,
totalPartitionResource);
usedCapacity =
Resources.divide(rc, totalPartitionResource, usedResource,
queueGuranteedResource);
}
queueCapacities
.setAbsoluteUsedCapacity(nodePartition, absoluteUsedCapacity);
queueCapacities.setUsedCapacity(nodePartition, usedCapacity);
}
private static Resource getNonPartitionedMaxAvailableResourceToQueue(
final ResourceCalculator rc, Resource totalNonPartitionedResource,
CSQueue queue) {
Resource queueLimit = Resources.none();
Resource usedResources = queue.getUsedResources();
if (Resources.greaterThan(rc, totalNonPartitionedResource,
totalNonPartitionedResource, Resources.none())) {
queueLimit =
Resources.multiply(totalNonPartitionedResource,
queue.getAbsoluteCapacity());
}
childQueue.setUsedCapacity(usedCapacity);
childQueue.setAbsoluteUsedCapacity(absoluteUsedCapacity);
Resource available = Resources.subtract(queueLimit, usedResources);
childQueue.getMetrics().setAvailableResourcesToQueue(
Resources.max(
calculator,
clusterResource,
available,
Resources.none()
)
);
return Resources.max(rc, totalNonPartitionedResource, available,
Resources.none());
}
/**
* <p>
* Update Queue Statistics:
* </p>
*
* <li>used-capacity/absolute-used-capacity by partition</li>
* <li>non-partitioned max-avail-resource to queue</li>
*
* <p>
* When nodePartition is null, all partition of
* used-capacity/absolute-used-capacity will be updated.
* </p>
*/
@Lock(CSQueue.class)
public static void updateQueueStatistics(
final ResourceCalculator rc, final Resource cluster, final Resource minimumAllocation,
final CSQueue childQueue, final RMNodeLabelsManager nlm,
final String nodePartition) {
QueueCapacities queueCapacities = childQueue.getQueueCapacities();
ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
if (nodePartition == null) {
for (String partition : Sets.union(
queueCapacities.getNodePartitionsSet(),
queueResourceUsage.getNodePartitionsSet())) {
updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
minimumAllocation, queueResourceUsage, queueCapacities, partition);
}
} else {
updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
minimumAllocation, queueResourceUsage, queueCapacities, nodePartition);
}
// Now in QueueMetrics, we only store available-resource-to-queue for
// default partition.
if (nodePartition == null
|| nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
childQueue.getMetrics().setAvailableResourcesToQueue(
getNonPartitionedMaxAvailableResourceToQueue(rc,
nlm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, cluster),
childQueue));
}
}
}

View File

@ -136,7 +136,8 @@ public class CapacityScheduler extends
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
static final Comparator<CSQueue> nonPartitionedQueueComparator =
new Comparator<CSQueue>() {
@Override
public int compare(CSQueue q1, CSQueue q2) {
if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
@ -148,6 +149,9 @@ public int compare(CSQueue q1, CSQueue q2) {
return q1.getQueuePath().compareTo(q2.getQueuePath());
}
};
static final PartitionedQueueComparator partitionedQueueComparator =
new PartitionedQueueComparator();
static final Comparator<FiCaSchedulerApp> applicationComparator =
new Comparator<FiCaSchedulerApp>() {
@ -274,8 +278,13 @@ public ResourceCalculator getResourceCalculator() {
}
@Override
public Comparator<CSQueue> getQueueComparator() {
return queueComparator;
public Comparator<CSQueue> getNonPartitionedQueueComparator() {
return nonPartitionedQueueComparator;
}
@Override
public PartitionedQueueComparator getPartitionedQueueComparator() {
return partitionedQueueComparator;
}
@Override

View File

@ -58,7 +58,9 @@ public interface CapacitySchedulerContext {
ResourceCalculator getResourceCalculator();
Comparator<CSQueue> getQueueComparator();
Comparator<CSQueue> getNonPartitionedQueueComparator();
PartitionedQueueComparator getPartitionedQueueComparator();
FiCaSchedulerNode getNode(NodeId nodeId);
}

View File

@ -1814,9 +1814,8 @@ public synchronized void updateClusterResource(Resource clusterResource,
setQueueResourceLimitsInfo(clusterResource);
// Update metrics
CSQueueUtils.updateQueueStatistics(
resourceCalculator, this, getParent(), clusterResource,
minimumAllocation);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, null);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible

View File

@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
@ -68,7 +69,8 @@ public class ParentQueue extends AbstractCSQueue {
protected final Set<CSQueue> childQueues;
private final boolean rootQueue;
final Comparator<CSQueue> queueComparator;
final Comparator<CSQueue> nonPartitionedQueueComparator;
final PartitionedQueueComparator partitionQueueComparator;
volatile int numApplications;
private final CapacitySchedulerContext scheduler;
@ -79,7 +81,8 @@ public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.scheduler = cs;
this.queueComparator = cs.getQueueComparator();
this.nonPartitionedQueueComparator = cs.getNonPartitionedQueueComparator();
this.partitionQueueComparator = cs.getPartitionedQueueComparator();
this.rootQueue = (parent == null);
@ -92,7 +95,7 @@ public ParentQueue(CapacitySchedulerContext cs,
". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
}
this.childQueues = new TreeSet<CSQueue>(queueComparator);
this.childQueues = new TreeSet<CSQueue>(nonPartitionedQueueComparator);
setupQueueConfigs(cs.getClusterResource());
@ -522,6 +525,17 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child,
return new ResourceLimits(childLimit);
}
private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) {
if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
return childQueues.iterator();
}
partitionQueueComparator.setPartitionToLookAt(node.getPartition());
List<CSQueue> childrenList = new ArrayList<>(childQueues);
Collections.sort(childrenList, partitionQueueComparator);
return childrenList.iterator();
}
private synchronized CSAssignment assignContainersToChildQueues(
Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
SchedulingMode schedulingMode) {
@ -531,7 +545,8 @@ private synchronized CSAssignment assignContainersToChildQueues(
printChildQueues();
// Try to assign to most 'under-served' sub-queue
for (Iterator<CSQueue> iter = childQueues.iterator(); iter.hasNext();) {
for (Iterator<CSQueue> iter = sortAndGetChildrenAllocationIterator(node); iter
.hasNext();) {
CSQueue childQueue = iter.next();
if(LOG.isDebugEnabled()) {
LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
@ -554,13 +569,17 @@ private synchronized CSAssignment assignContainersToChildQueues(
if (Resources.greaterThan(
resourceCalculator, cluster,
assignment.getResource(), Resources.none())) {
// Remove and re-insert to sort
iter.remove();
LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() +
" stats: " + childQueue);
childQueues.add(childQueue);
if (LOG.isDebugEnabled()) {
printChildQueues();
// Only update childQueues when we doing non-partitioned node
// allocation.
if (RMNodeLabelsManager.NO_LABEL.equals(node.getPartition())) {
// Remove and re-insert to sort
iter.remove();
LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath()
+ " stats: " + childQueue);
childQueues.add(childQueue);
if (LOG.isDebugEnabled()) {
printChildQueues();
}
}
break;
}
@ -647,9 +666,8 @@ public synchronized void updateClusterResource(Resource clusterResource,
childQueue.updateClusterResource(clusterResource, childLimits);
}
// Update metrics
CSQueueUtils.updateQueueStatistics(
resourceCalculator, this, parent, clusterResource, minimumAllocation);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, null);
}
@Override

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.Comparator;
public class PartitionedQueueComparator implements Comparator<CSQueue> {
private String partitionToLookAt = null;
public void setPartitionToLookAt(String partitionToLookAt) {
this.partitionToLookAt = partitionToLookAt;
}
@Override
public int compare(CSQueue q1, CSQueue q2) {
/*
* 1. Check accessible to given partition, if one queue accessible and
* the other not, accessible queue goes first.
*/
boolean q1Accessible =
q1.getAccessibleNodeLabels().contains(partitionToLookAt);
boolean q2Accessible =
q2.getAccessibleNodeLabels().contains(partitionToLookAt);
if (q1Accessible && !q2Accessible) {
return -1;
} else if (!q1Accessible && q2Accessible) {
return 1;
}
/*
*
* 2. When two queue has same accessibility, check who will go first:
* Now we simply compare their used resource on the partition to lookAt
*/
float used1 = q1.getQueueCapacities().getUsedCapacity(partitionToLookAt);
float used2 = q2.getQueueCapacities().getUsedCapacity(partitionToLookAt);
if (Math.abs(used1 - used2) < 1e-6) {
// When used capacity is same, compare their guaranteed-capacity
float cap1 = q1.getQueueCapacities().getCapacity(partitionToLookAt);
float cap2 = q2.getQueueCapacities().getCapacity(partitionToLookAt);
// when cap1 == cap2, we will compare queue's name
if (Math.abs(cap1 - cap2) < 1e-6) {
return q1.getQueueName().compareTo(q2.getQueueName());
}
return Float.compare(cap2, cap1);
}
return Float.compare(used1, used2);
}
}

View File

@ -30,8 +30,6 @@
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import com.google.common.collect.Sets;
public class QueueCapacities {
private static final String NL = CommonNodeLabelsManager.NO_LABEL;
private static final float LABEL_DOESNT_EXIST_CAP = 0f;
@ -254,4 +252,13 @@ public String toString() {
readLock.unlock();
}
}
public Set<String> getNodePartitionsSet() {
try {
readLock.lock();
return capacitiesMap.keySet();
} finally {
readLock.unlock();
}
}
}

View File

@ -63,10 +63,9 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
+ " from " + newlyParsedQueue.getQueuePath());
}
super.reinitialize(newlyParsedQueue, clusterResource);
CSQueueUtils.updateQueueStatistics(
parent.schedulerContext.getResourceCalculator(), newlyParsedQueue,
parent, parent.schedulerContext.getClusterResource(),
parent.schedulerContext.getMinimumResourceCapability());
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, null);
updateQuotas(parent.getUserLimitForReservation(),
parent.getUserLimitFactor(),
parent.getMaxApplicationsForReservations(),

View File

@ -95,8 +95,8 @@ public void setUp() throws IOException {
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getQueueComparator()).
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
@ -255,8 +255,8 @@ public void testLimitsComputation() throws Exception {
thenReturn(Resources.createResource(16*GB, 16));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getQueueComparator()).
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
@ -554,8 +554,8 @@ public void testHeadroom() throws Exception {
thenReturn(Resources.createResource(16*GB));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getQueueComparator()).
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);

View File

@ -96,8 +96,8 @@ public void setUp() throws Exception {
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getQueueComparator()).
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);

View File

@ -152,8 +152,8 @@ public void setUp() throws Exception {
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getQueueComparator()).
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);

View File

@ -1024,4 +1024,455 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm1.close();
}
private void checkQueueUsedCapacity(String queueName, CapacityScheduler cs,
String nodePartition, float usedCapacity, float absoluteUsedCapacity) {
float epsilon = 1e-6f;
CSQueue queue = cs.getQueue(queueName);
Assert.assertNotNull("Failed to get queue=" + queueName, queue);
Assert.assertEquals(usedCapacity, queue.getQueueCapacities()
.getUsedCapacity(nodePartition), epsilon);
Assert.assertEquals(absoluteUsedCapacity, queue.getQueueCapacities()
.getAbsoluteUsedCapacity(nodePartition), epsilon);
}
private void doNMHeartbeat(MockRM rm, NodeId nodeId, int nHeartbeat) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nodeId);
for (int i = 0; i < nHeartbeat; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
}
private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum)
throws InterruptedException {
int totalWaitTick = 100; // wait 10 sec at most.
while (expectedNodeNum > rm.getResourceScheduler().getNumClusterNodes()
&& totalWaitTick > 0) {
Thread.sleep(100);
totalWaitTick--;
}
}
@Test
public void testQueueUsedCapacitiesUpdate()
throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* / \
* a b
* / \ (x)
* a1 a2
* (x) (x)
* </pre>
*
* Both a/b can access x, we need to verify when
* <pre>
* 1) container allocated/released in both partitioned/non-partitioned node,
* 2) clusterResource updates
* 3) queue guaranteed resource changed
* </pre>
*
* used capacity / absolute used capacity of queues are correctly updated.
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
"b" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
/**
* Initially, we set A/B's resource 50:50
*/
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 50);
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 50);
csConf.setQueues(A, new String[] { "a1", "a2" });
final String A1 = A + ".a1";
csConf.setCapacity(A1, 50);
csConf.setAccessibleNodeLabels(A1, toSet("x"));
csConf.setCapacityByLabel(A1, "x", 50);
final String A2 = A + ".a2";
csConf.setCapacity(A2, 50);
csConf.setAccessibleNodeLabels(A2, toSet("x"));
csConf.setCapacityByLabel(A2, "x", 50);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 50);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 50);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
// Makes x to be non-exclusive node labels
mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
/*
* Before we adding any node to the cluster, used-capacity/abs-used-capacity
* should be 0
*/
checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a", cs, "", 0f, 0f);
checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a1", cs, "", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
checkQueueUsedCapacity("root", cs, "", 0f, 0f);
MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = <empty>
/*
* After we adding nodes to the cluster, and before starting to use them,
* used-capacity/abs-used-capacity should be 0
*/
checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a", cs, "", 0f, 0f);
checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a1", cs, "", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
checkQueueUsedCapacity("root", cs, "", 0f, 0f);
// app1 -> a1
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
// app1 asks for 1 partition= containers
am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
doNMHeartbeat(rm, nm2.getNodeId(), 10);
// Now check usage, app1 uses:
// a1: used(no-label) = 80%
// abs-used(no-label) = 20%
// a: used(no-label) = 40%
// abs-used(no-label) = 20%
// root: used(no-label) = 20%
// abs-used(no-label) = 20%
checkQueueUsedCapacity("a", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f);
checkQueueUsedCapacity("a1", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0f, 0f);
checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f);
// app1 asks for 2 partition=x containers
am1.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 10);
// Now check usage, app1 uses:
// a1: used(x) = 80%
// abs-used(x) = 20%
// a: used(x) = 40%
// abs-used(x) = 20%
// root: used(x) = 20%
// abs-used(x) = 20%
checkQueueUsedCapacity("a", cs, "x", 0.4f, 0.2f);
checkQueueUsedCapacity("a", cs, "", 0.4f, 0.2f);
checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f);
checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
checkQueueUsedCapacity("a2", cs, "x", 0f, 0f);
checkQueueUsedCapacity("a2", cs, "", 0f, 0f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0.2f, 0.2f);
checkQueueUsedCapacity("root", cs, "", 0.2f, 0.2f);
// submit an app to a2, uses 1 NON_PARTITIONED container and 1 PARTITIONED
// container
// app2 -> a2
RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "a2");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
// app1 asks for 1 partition= containers
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 10);
// Now check usage, app1 uses:
// a2: used(x) = 40%
// abs-used(x) = 10%
// a: used(x) = 20%
// abs-used(x) = 10%
// root: used(x) = 10%
// abs-used(x) = 10%
checkQueueUsedCapacity("a", cs, "x", 0.6f, 0.3f);
checkQueueUsedCapacity("a", cs, "", 0.6f, 0.3f);
checkQueueUsedCapacity("a1", cs, "x", 0.8f, 0.2f);
checkQueueUsedCapacity("a1", cs, "", 0.8f, 0.2f);
checkQueueUsedCapacity("a2", cs, "x", 0.4f, 0.1f);
checkQueueUsedCapacity("a2", cs, "", 0.4f, 0.1f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0.3f, 0.3f);
checkQueueUsedCapacity("root", cs, "", 0.3f, 0.3f);
// Add nm3/nm4, double resource for both partitioned/non-partitioned
// resource, used capacity should be 1/2 of before
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h3", 0), toSet("x")));
rm.registerNode("h3:1234", 10 * GB); // label = x
rm.registerNode("h4:1234", 10 * GB); // label = <empty>
waitSchedulerNodeJoined(rm, 4);
checkQueueUsedCapacity("a", cs, "x", 0.3f, 0.15f);
checkQueueUsedCapacity("a", cs, "", 0.3f, 0.15f);
checkQueueUsedCapacity("a1", cs, "x", 0.4f, 0.1f);
checkQueueUsedCapacity("a1", cs, "", 0.4f, 0.1f);
checkQueueUsedCapacity("a2", cs, "x", 0.2f, 0.05f);
checkQueueUsedCapacity("a2", cs, "", 0.2f, 0.05f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f);
checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f);
// Reinitialize queue, makes A's capacity double, and B's capacity to be 0
csConf.setCapacity(A, 100); // was 50
csConf.setCapacityByLabel(A, "x", 100); // was 50
csConf.setCapacity(B, 0); // was 50
csConf.setCapacityByLabel(B, "x", 0); // was 50
cs.reinitialize(csConf, rm.getRMContext());
checkQueueUsedCapacity("a", cs, "x", 0.15f, 0.15f);
checkQueueUsedCapacity("a", cs, "", 0.15f, 0.15f);
checkQueueUsedCapacity("a1", cs, "x", 0.2f, 0.1f);
checkQueueUsedCapacity("a1", cs, "", 0.2f, 0.1f);
checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f);
checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0.15f, 0.15f);
checkQueueUsedCapacity("root", cs, "", 0.15f, 0.15f);
// Release all task containers from a1, check usage
am1.allocate(null, Arrays.asList(
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2),
ContainerId.newContainerId(am1.getApplicationAttemptId(), 3),
ContainerId.newContainerId(am1.getApplicationAttemptId(), 4)));
checkQueueUsedCapacity("a", cs, "x", 0.05f, 0.05f);
checkQueueUsedCapacity("a", cs, "", 0.10f, 0.10f);
checkQueueUsedCapacity("a1", cs, "x", 0.0f, 0.0f);
checkQueueUsedCapacity("a1", cs, "", 0.1f, 0.05f);
checkQueueUsedCapacity("a2", cs, "x", 0.1f, 0.05f);
checkQueueUsedCapacity("a2", cs, "", 0.1f, 0.05f);
checkQueueUsedCapacity("b", cs, "x", 0f, 0f);
checkQueueUsedCapacity("b", cs, "", 0f, 0f);
checkQueueUsedCapacity("root", cs, "x", 0.05f, 0.05f);
checkQueueUsedCapacity("root", cs, "", 0.10f, 0.10f);
rm.close();
}
@Test
public void testOrderOfAllocationOnPartitions()
throws Exception {
/**
* Test case: have a following queue structure:
*
* <pre>
* root
* ________________
* / | \ \
* a (x) b (x) c d
* </pre>
*
* Both a/b can access x, we need to verify when
* <pre>
* When doing allocation on partitioned nodes,
* - Queue has accessibility to the node will go first
* - When accessibility is same
* - Queue has less used_capacity on given partition will go first
* - When used_capacity is same
* - Queue has more abs_capacity will go first
* </pre>
*
* used capacity / absolute used capacity of queues are correctly updated.
*/
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration(this.conf);
// Define top-level queues
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
"b", "c", "d" });
csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
csConf.setCapacity(A, 25);
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 30);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 25);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 70);
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
csConf.setCapacity(C, 25);
final String D = CapacitySchedulerConfiguration.ROOT + ".d";
csConf.setCapacity(D, 25);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
// Makes x to be non-exclusive node labels
mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
// inject node label manager
MockRM rm = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm.getRMContext().setNodeLabelManager(mgr);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("h1:1234", 10 * GB); // label = x
MockNM nm2 = rm.registerNode("h2:1234", 10 * GB); // label = <empty>
// app1 -> a
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
// app2 -> b
RMApp app2 = rm.submitApp(1 * GB, "app", "user", null, "b");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
// app3 -> c
RMApp app3 = rm.submitApp(1 * GB, "app", "user", null, "c");
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm2);
// app4 -> d
RMApp app4 = rm.submitApp(1 * GB, "app", "user", null, "d");
MockAM am4 = MockRM.launchAndRegisterAM(app4, rm, nm2);
// Test case 1
// Both a/b has used_capacity(x) = 0, when doing exclusive allocation, b
// will go first since b has more capacity(x)
am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
// Test case 2
// Do another allocation, a will go first since it has 0 use_capacity(x) and
// b has 1/7 used_capacity(x)
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "x");
doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
// Test case 3
// Just like above, when doing non-exclusive allocation, b will go first as well.
am1.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
doNMHeartbeat(rm, nm1.getNodeId(), 2);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
// Test case 4
// After b allocated, we should be able to allocate non-exlusive container in a
doNMHeartbeat(rm, nm1.getNodeId(), 2);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
// Test case 5
// b/c/d asks non-exclusive container together, b will go first irrelated to
// used_capacity(x)
am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>(), "");
am3.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "");
am4.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>(), "");
doNMHeartbeat(rm, nm1.getNodeId(), 2);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
cs.getApplicationAttempt(am3.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
// Test case 6
// After b allocated, c will go first by lexicographic order
doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am3.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(),
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
// Test case 7
// After c allocated, d will go first because it has less used_capacity(x)
// than c
doNMHeartbeat(rm, nm1.getNodeId(), 2);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am3.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
// Test case 8
// After d allocated, c will go first, c/d has same use_capacity(x), so compare c/d's lexicographic order
doNMHeartbeat(rm, nm1.getNodeId(), 1);
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am1.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(3, nm1.getNodeId(),
cs.getApplicationAttempt(am2.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(2, nm1.getNodeId(),
cs.getApplicationAttempt(am3.getApplicationAttemptId()));
checkNumOfContainersInAnAppOnGivenNode(1, nm1.getNodeId(),
cs.getApplicationAttempt(am4.getApplicationAttemptId()));
}
}

View File

@ -92,8 +92,8 @@ public void setUp() throws Exception {
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getApplicationComparator()).
thenReturn(CapacityScheduler.applicationComparator);
when(csContext.getQueueComparator()).
thenReturn(CapacityScheduler.queueComparator);
when(csContext.getNonPartitionedQueueComparator()).
thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).
thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);

View File

@ -122,8 +122,8 @@ private void setup(CapacitySchedulerConfiguration csConf) throws Exception {
Resources.createResource(100 * 16 * GB, 100 * 12));
when(csContext.getApplicationComparator()).thenReturn(
CapacityScheduler.applicationComparator);
when(csContext.getQueueComparator()).thenReturn(
CapacityScheduler.queueComparator);
when(csContext.getNonPartitionedQueueComparator()).thenReturn(
CapacityScheduler.nonPartitionedQueueComparator);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
RMContainerTokenSecretManager containerTokenSecretManager = new RMContainerTokenSecretManager(

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@ -301,6 +302,9 @@ public Map<NodeId, FiCaSchedulerNode> getNodes(){
scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
NullRMNodeLabelsManager nlm = new NullRMNodeLabelsManager();
nlm.init(new Configuration());
rmContext.setNodeLabelManager(nlm);
scheduler.setRMContext(rmContext);
scheduler.init(conf);