YARN-6471. Support to add min/max resource configuration for a queue. (Sunil G via wangda)

Change-Id: I9213f5297a6841fab5c573e85ee4c4e5f4a0b7ff
This commit is contained in:
Wangda Tan 2017-08-11 10:30:23 -07:00
parent d6c31a3e6b
commit 5e798b1a0d
35 changed files with 1831 additions and 430 deletions

View File

@ -1152,4 +1152,35 @@ public static boolean equalsIgnoreCase(String s1, String s2) {
return s1.equalsIgnoreCase(s2);
}
/**
* <p>Checks if the String contains only unicode letters.</p>
*
* <p><code>null</code> will return <code>false</code>.
* An empty String (length()=0) will return <code>true</code>.</p>
*
* <pre>
* StringUtils.isAlpha(null) = false
* StringUtils.isAlpha("") = true
* StringUtils.isAlpha(" ") = false
* StringUtils.isAlpha("abc") = true
* StringUtils.isAlpha("ab2c") = false
* StringUtils.isAlpha("ab-c") = false
* </pre>
*
* @param str the String to check, may be null
* @return <code>true</code> if only contains letters, and is non-null
*/
public static boolean isAlpha(String str) {
if (str == null) {
return false;
}
int sz = str.length();
for (int i = 0; i < sz; i++) {
if (Character.isLetter(str.charAt(i)) == false) {
return false;
}
}
return true;
}
}

View File

@ -131,4 +131,10 @@ public boolean fitsIn(Resource smaller, Resource bigger) {
public boolean isAnyMajorResourceZero(Resource resource) {
return resource.getMemorySize() == 0f;
}
@Override
public Resource normalizeDown(Resource r, Resource stepFactor) {
return Resources.createResource(
roundDown((r.getMemorySize()), stepFactor.getMemorySize()));
}
}

View File

@ -567,4 +567,11 @@ public boolean isAnyMajorResourceZero(Resource resource) {
}
return false;
}
@Override
public Resource normalizeDown(Resource r, Resource stepFactor) {
return Resources.createResource(
roundDown(r.getMemorySize(), stepFactor.getMemorySize()),
roundDown(r.getVirtualCores(), stepFactor.getVirtualCores()));
}
}

View File

@ -235,4 +235,16 @@ public abstract float divide(
* @return returns true if any resource is zero.
*/
public abstract boolean isAnyMajorResourceZero(Resource resource);
/**
* Get resource <code>r</code>and normalize down using step-factor
* <code>stepFactor</code>.
*
* @param r
* resource to be multiplied
* @param stepFactor
* factor by which to normalize down
* @return resulting normalized resource
*/
public abstract Resource normalizeDown(Resource r, Resource stepFactor);
}

View File

@ -546,4 +546,9 @@ public static boolean isAnyMajorResourceZero(ResourceCalculator rc,
Resource resource) {
return rc.isAnyMajorResourceZero(resource);
}
public static Resource normalizeDown(ResourceCalculator calculator,
Resource resource, Resource factor) {
return calculator.normalizeDown(resource, factor);
}
}

View File

@ -140,10 +140,10 @@ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
// Can try preempting AMContainers (still saving atmost
// maxAMCapacityForThisQueue AMResource's) if more resources are
// required to be preemptionCandidates from this Queue.
Resource maxAMCapacityForThisQueue = Resources.multiply(
Resources.multiply(clusterResource,
leafQueue.getAbsoluteCapacity()),
leafQueue.getMaxAMResourcePerQueuePercent());
Resource maxAMCapacityForThisQueue = Resources
.multiply(
leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL),
leafQueue.getMaxAMResourcePerQueuePercent());
preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
@ -199,7 +199,6 @@ private void preemptAMContainers(Resource clusterResource,
* Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app).
*/
@SuppressWarnings("unchecked")
private void preemptFrom(FiCaSchedulerApp app,
Resource clusterResource, Map<String, Resource> resToObtainByPartition,
List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
@ -525,6 +526,13 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
boolean preemptionDisabled = curQueue.getPreemptionDisabled();
QueueResourceQuotas queueResourceQuotas = curQueue
.getQueueResourceQuotas();
Resource effMinRes = queueResourceQuotas
.getEffectiveMinResource(partitionToLookAt);
Resource effMaxRes = queueResourceQuotas
.getEffectiveMaxResource(partitionToLookAt);
Resource current = Resources
.clone(curQueue.getQueueResourceUsage().getUsed(partitionToLookAt));
Resource killable = Resources.none();
@ -550,7 +558,7 @@ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
ret = new TempQueuePerPartition(queueName, current, preemptionDisabled,
partitionToLookAt, killable, absCap, absMaxCap, partitionResource,
reserved, curQueue);
reserved, curQueue, effMinRes, effMaxRes);
if (curQueue instanceof ParentQueue) {
String configuredOrderingPolicy =

View File

@ -48,6 +48,9 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
double normalizedGuarantee;
private Resource effMinRes;
private Resource effMaxRes;
final ArrayList<TempQueuePerPartition> children;
private Collection<TempAppPerPartition> apps;
LeafQueue leafQueue;
@ -68,7 +71,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
TempQueuePerPartition(String queueName, Resource current,
boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
Resource reserved, CSQueue queue) {
Resource reserved, CSQueue queue, Resource effMinRes,
Resource effMaxRes) {
super(queueName, current, Resource.newInstance(0, 0), reserved,
Resource.newInstance(0, 0));
@ -95,6 +99,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
this.absCapacity = absCapacity;
this.absMaxCapacity = absMaxCapacity;
this.totalPartitionResource = totalPartitionResource;
this.effMinRes = effMinRes;
this.effMaxRes = effMaxRes;
}
public void setLeafQueue(LeafQueue l) {
@ -177,10 +183,18 @@ Resource offer(Resource avail, ResourceCalculator rc,
}
public Resource getGuaranteed() {
if(!effMinRes.equals(Resources.none())) {
return Resources.clone(effMinRes);
}
return Resources.multiply(totalPartitionResource, absCapacity);
}
public Resource getMax() {
if(!effMaxRes.equals(Resources.none())) {
return Resources.clone(effMaxRes);
}
return Resources.multiply(totalPartitionResource, absMaxCapacity);
}

View File

@ -0,0 +1,198 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* This class can be used to track resource usage in queue/user/app.
*
* And it is thread-safe
*/
public class AbstractResourceUsage {
protected ReadLock readLock;
protected WriteLock writeLock;
protected Map<String, UsageByLabel> usages;
// short for no-label :)
private static final String NL = CommonNodeLabelsManager.NO_LABEL;
public AbstractResourceUsage() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
usages = new HashMap<String, UsageByLabel>();
usages.put(NL, new UsageByLabel(NL));
}
// Usage enum here to make implement cleaner
public enum ResourceType {
// CACHED_USED and CACHED_PENDING may be read by anyone, but must only
// be written by ordering policies
USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4), CACHED_PENDING(
5), AMLIMIT(6), MIN_RESOURCE(7), MAX_RESOURCE(8), EFF_MIN_RESOURCE(
9), EFF_MAX_RESOURCE(
10), EFF_MIN_RESOURCE_UP(11), EFF_MAX_RESOURCE_UP(12);
private int idx;
private ResourceType(int value) {
this.idx = value;
}
}
public static class UsageByLabel {
// usage by label, contains all UsageType
private Resource[] resArr;
public UsageByLabel(String label) {
resArr = new Resource[ResourceType.values().length];
for (int i = 0; i < resArr.length; i++) {
resArr[i] = Resource.newInstance(0, 0);
};
}
public Resource getUsed() {
return resArr[ResourceType.USED.idx];
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{used=" + resArr[0] + "%, ");
sb.append("pending=" + resArr[1] + "%, ");
sb.append("am_used=" + resArr[2] + "%, ");
sb.append("reserved=" + resArr[3] + "%}");
sb.append("min_eff=" + resArr[9] + "%, ");
sb.append("max_eff=" + resArr[10] + "%}");
sb.append("min_effup=" + resArr[11] + "%, ");
return sb.toString();
}
}
private static Resource normalize(Resource res) {
if (res == null) {
return Resources.none();
}
return res;
}
protected Resource _get(String label, ResourceType type) {
if (label == null) {
label = RMNodeLabelsManager.NO_LABEL;
}
try {
readLock.lock();
UsageByLabel usage = usages.get(label);
if (null == usage) {
return Resources.none();
}
return normalize(usage.resArr[type.idx]);
} finally {
readLock.unlock();
}
}
protected Resource _getAll(ResourceType type) {
try {
readLock.lock();
Resource allOfType = Resources.createResource(0);
for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
//all usages types are initialized
Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
}
return allOfType;
} finally {
readLock.unlock();
}
}
private UsageByLabel getAndAddIfMissing(String label) {
if (label == null) {
label = RMNodeLabelsManager.NO_LABEL;
}
if (!usages.containsKey(label)) {
UsageByLabel u = new UsageByLabel(label);
usages.put(label, u);
return u;
}
return usages.get(label);
}
protected void _set(String label, ResourceType type, Resource res) {
try {
writeLock.lock();
UsageByLabel usage = getAndAddIfMissing(label);
usage.resArr[type.idx] = res;
} finally {
writeLock.unlock();
}
}
protected void _inc(String label, ResourceType type, Resource res) {
try {
writeLock.lock();
UsageByLabel usage = getAndAddIfMissing(label);
Resources.addTo(usage.resArr[type.idx], res);
} finally {
writeLock.unlock();
}
}
protected void _dec(String label, ResourceType type, Resource res) {
try {
writeLock.lock();
UsageByLabel usage = getAndAddIfMissing(label);
Resources.subtractFrom(usage.resArr[type.idx], res);
} finally {
writeLock.unlock();
}
}
@Override
public String toString() {
try {
readLock.lock();
return usages.toString();
} finally {
readLock.unlock();
}
}
public Set<String> getNodePartitionsSet() {
try {
readLock.lock();
return usages.keySet();
} finally {
readLock.unlock();
}
}
}

View File

@ -0,0 +1,153 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
/**
* QueueResourceQuotas by Labels for following fields by label
* - EFFECTIVE_MIN_CAPACITY
* - EFFECTIVE_MAX_CAPACITY
* This class can be used to track resource usage in queue/user/app.
*
* And it is thread-safe
*/
public class QueueResourceQuotas extends AbstractResourceUsage {
// short for no-label :)
private static final String NL = CommonNodeLabelsManager.NO_LABEL;
public QueueResourceQuotas() {
super();
}
/*
* Configured Minimum Resource
*/
public Resource getConfiguredMinResource() {
return _get(NL, ResourceType.MIN_RESOURCE);
}
public Resource getConfiguredMinResource(String label) {
return _get(label, ResourceType.MIN_RESOURCE);
}
public void setConfiguredMinResource(String label, Resource res) {
_set(label, ResourceType.MIN_RESOURCE, res);
}
public void setConfiguredMinResource(Resource res) {
_set(NL, ResourceType.MIN_RESOURCE, res);
}
/*
* Configured Maximum Resource
*/
public Resource getConfiguredMaxResource() {
return getConfiguredMaxResource(NL);
}
public Resource getConfiguredMaxResource(String label) {
return _get(label, ResourceType.MAX_RESOURCE);
}
public void setConfiguredMaxResource(Resource res) {
setConfiguredMaxResource(NL, res);
}
public void setConfiguredMaxResource(String label, Resource res) {
_set(label, ResourceType.MAX_RESOURCE, res);
}
/*
* Effective Minimum Resource
*/
public Resource getEffectiveMinResource() {
return _get(NL, ResourceType.EFF_MIN_RESOURCE);
}
public Resource getEffectiveMinResource(String label) {
return _get(label, ResourceType.EFF_MIN_RESOURCE);
}
public void setEffectiveMinResource(String label, Resource res) {
_set(label, ResourceType.EFF_MIN_RESOURCE, res);
}
public void setEffectiveMinResource(Resource res) {
_set(NL, ResourceType.EFF_MIN_RESOURCE, res);
}
/*
* Effective Maximum Resource
*/
public Resource getEffectiveMaxResource() {
return getEffectiveMaxResource(NL);
}
public Resource getEffectiveMaxResource(String label) {
return _get(label, ResourceType.EFF_MAX_RESOURCE);
}
public void setEffectiveMaxResource(Resource res) {
setEffectiveMaxResource(NL, res);
}
public void setEffectiveMaxResource(String label, Resource res) {
_set(label, ResourceType.EFF_MAX_RESOURCE, res);
}
/*
* Effective Minimum Resource
*/
public Resource getEffectiveMinResourceUp() {
return _get(NL, ResourceType.EFF_MIN_RESOURCE_UP);
}
public Resource getEffectiveMinResourceUp(String label) {
return _get(label, ResourceType.EFF_MIN_RESOURCE_UP);
}
public void setEffectiveMinResourceUp(String label, Resource res) {
_set(label, ResourceType.EFF_MIN_RESOURCE_UP, res);
}
public void setEffectiveMinResourceUp(Resource res) {
_set(NL, ResourceType.EFF_MIN_RESOURCE_UP, res);
}
/*
* Effective Maximum Resource
*/
public Resource getEffectiveMaxResourceUp() {
return getEffectiveMaxResourceUp(NL);
}
public Resource getEffectiveMaxResourceUp(String label) {
return _get(label, ResourceType.EFF_MAX_RESOURCE_UP);
}
public void setEffectiveMaxResourceUp(Resource res) {
setEffectiveMaxResourceUp(NL, res);
}
public void setEffectiveMaxResourceUp(String label, Resource res) {
_set(label, ResourceType.EFF_MAX_RESOURCE_UP, res);
}
}

View File

@ -39,63 +39,12 @@
*
* And it is thread-safe
*/
public class ResourceUsage {
private ReadLock readLock;
private WriteLock writeLock;
private Map<String, UsageByLabel> usages;
public class ResourceUsage extends AbstractResourceUsage {
// short for no-label :)
private static final String NL = CommonNodeLabelsManager.NO_LABEL;
private final UsageByLabel usageNoLabel;
public ResourceUsage() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
usages = new HashMap<String, UsageByLabel>();
usageNoLabel = new UsageByLabel(NL);
usages.put(NL, usageNoLabel);
}
// Usage enum here to make implement cleaner
private enum ResourceType {
//CACHED_USED and CACHED_PENDING may be read by anyone, but must only
//be written by ordering policies
USED(0), PENDING(1), AMUSED(2), RESERVED(3), CACHED_USED(4),
CACHED_PENDING(5), AMLIMIT(6);
private int idx;
private ResourceType(int value) {
this.idx = value;
}
}
private static class UsageByLabel {
// usage by label, contains all UsageType
private Resource[] resArr;
public UsageByLabel(String label) {
resArr = new Resource[ResourceType.values().length];
for (int i = 0; i < resArr.length; i++) {
resArr[i] = Resource.newInstance(0, 0);
};
}
public Resource getUsed() {
return resArr[ResourceType.USED.idx];
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{used=" + resArr[0] + "%, ");
sb.append("pending=" + resArr[1] + "%, ");
sb.append("am_used=" + resArr[2] + "%, ");
sb.append("reserved=" + resArr[3] + "%}");
sb.append("am_limit=" + resArr[6] + "%, ");
return sb.toString();
}
super();
}
/*
@ -109,22 +58,6 @@ public Resource getUsed(String label) {
return _get(label, ResourceType.USED);
}
public Resource getCachedUsed() {
return _get(NL, ResourceType.CACHED_USED);
}
public Resource getCachedUsed(String label) {
return _get(label, ResourceType.CACHED_USED);
}
public Resource getCachedPending() {
return _get(NL, ResourceType.CACHED_PENDING);
}
public Resource getCachedPending(String label) {
return _get(label, ResourceType.CACHED_PENDING);
}
public void incUsed(String label, Resource res) {
_inc(label, ResourceType.USED, res);
}
@ -145,7 +78,7 @@ public void setUsed(Resource res) {
setUsed(NL, res);
}
public void copyAllUsed(ResourceUsage other) {
public void copyAllUsed(AbstractResourceUsage other) {
try {
writeLock.lock();
for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
@ -160,22 +93,6 @@ public void setUsed(String label, Resource res) {
_set(label, ResourceType.USED, res);
}
public void setCachedUsed(String label, Resource res) {
_set(label, ResourceType.CACHED_USED, res);
}
public void setCachedUsed(Resource res) {
_set(NL, ResourceType.CACHED_USED, res);
}
public void setCachedPending(String label, Resource res) {
_set(label, ResourceType.CACHED_PENDING, res);
}
public void setCachedPending(Resource res) {
_set(NL, ResourceType.CACHED_PENDING, res);
}
/*
* Pending
*/
@ -281,6 +198,47 @@ public void setAMUsed(String label, Resource res) {
_set(label, ResourceType.AMUSED, res);
}
public Resource getAllPending() {
return _getAll(ResourceType.PENDING);
}
public Resource getAllUsed() {
return _getAll(ResourceType.USED);
}
// Cache Used
public Resource getCachedUsed() {
return _get(NL, ResourceType.CACHED_USED);
}
public Resource getCachedUsed(String label) {
return _get(label, ResourceType.CACHED_USED);
}
public Resource getCachedPending() {
return _get(NL, ResourceType.CACHED_PENDING);
}
public Resource getCachedPending(String label) {
return _get(label, ResourceType.CACHED_PENDING);
}
public void setCachedUsed(String label, Resource res) {
_set(label, ResourceType.CACHED_USED, res);
}
public void setCachedUsed(Resource res) {
_set(NL, ResourceType.CACHED_USED, res);
}
public void setCachedPending(String label, Resource res) {
_set(label, ResourceType.CACHED_PENDING, res);
}
public void setCachedPending(Resource res) {
_set(NL, ResourceType.CACHED_PENDING, res);
}
/*
* AM-Resource Limit
*/
@ -316,94 +274,6 @@ public void setAMLimit(String label, Resource res) {
_set(label, ResourceType.AMLIMIT, res);
}
private static Resource normalize(Resource res) {
if (res == null) {
return Resources.none();
}
return res;
}
private Resource _get(String label, ResourceType type) {
if (label == null || label.equals(NL)) {
return normalize(usageNoLabel.resArr[type.idx]);
}
try {
readLock.lock();
UsageByLabel usage = usages.get(label);
if (null == usage) {
return Resources.none();
}
return normalize(usage.resArr[type.idx]);
} finally {
readLock.unlock();
}
}
private Resource _getAll(ResourceType type) {
try {
readLock.lock();
Resource allOfType = Resources.createResource(0);
for (Map.Entry<String, UsageByLabel> usageEntry : usages.entrySet()) {
//all usages types are initialized
Resources.addTo(allOfType, usageEntry.getValue().resArr[type.idx]);
}
return allOfType;
} finally {
readLock.unlock();
}
}
public Resource getAllPending() {
return _getAll(ResourceType.PENDING);
}
public Resource getAllUsed() {
return _getAll(ResourceType.USED);
}
private UsageByLabel getAndAddIfMissing(String label) {
if (label == null || label.equals(NL)) {
return usageNoLabel;
}
if (!usages.containsKey(label)) {
UsageByLabel u = new UsageByLabel(label);
usages.put(label, u);
return u;
}
return usages.get(label);
}
private void _set(String label, ResourceType type, Resource res) {
try {
writeLock.lock();
UsageByLabel usage = getAndAddIfMissing(label);
usage.resArr[type.idx] = res;
} finally {
writeLock.unlock();
}
}
private void _inc(String label, ResourceType type, Resource res) {
try {
writeLock.lock();
UsageByLabel usage = getAndAddIfMissing(label);
Resources.addTo(usage.resArr[type.idx], res);
} finally {
writeLock.unlock();
}
}
private void _dec(String label, ResourceType type, Resource res) {
try {
writeLock.lock();
UsageByLabel usage = getAndAddIfMissing(label);
Resources.subtractFrom(usage.resArr[type.idx], res);
} finally {
writeLock.unlock();
}
}
public Resource getCachedDemand(String label) {
try {
readLock.lock();
@ -415,23 +285,4 @@ public Resource getCachedDemand(String label) {
readLock.unlock();
}
}
@Override
public String toString() {
try {
readLock.lock();
return usages.toString();
} finally {
readLock.unlock();
}
}
public Set<String> getNodePartitionsSet() {
try {
readLock.lock();
return usages.keySet();
} finally {
readLock.unlock();
}
}
}

View File

@ -56,6 +56,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AbsoluteResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -87,6 +89,7 @@ public abstract class AbstractCSQueue implements CSQueue {
final ResourceCalculator resourceCalculator;
Set<String> accessibleLabels;
Set<String> resourceTypes;
final RMNodeLabelsManager labelManager;
String defaultLabelExpression;
@ -102,6 +105,14 @@ public abstract class AbstractCSQueue implements CSQueue {
// etc.
QueueCapacities queueCapacities;
QueueResourceQuotas queueResourceQuotas;
protected enum CapacityConfigType {
NONE, PERCENTAGE, ABSOLUTE_RESOURCE
};
protected CapacityConfigType capacityConfigType =
CapacityConfigType.NONE;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
protected CapacitySchedulerContext csContext;
@ -141,6 +152,9 @@ public AbstractCSQueue(CapacitySchedulerContext cs,
// initialize QueueCapacities
queueCapacities = new QueueCapacities(parent == null);
// initialize queueResourceQuotas
queueResourceQuotas = new QueueResourceQuotas();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
@ -276,6 +290,10 @@ void setupQueueConfigs(Resource clusterResource)
this.defaultLabelExpression =
csContext.getConfiguration().getDefaultNodeLabelExpression(
getQueuePath());
this.resourceTypes = new HashSet<String>();
for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
resourceTypes.add(type.toString().toLowerCase());
}
// inherit from parent if labels not set
if (this.accessibleLabels == null && parent != null) {
@ -292,6 +310,11 @@ void setupQueueConfigs(Resource clusterResource)
// After we setup labels, we can setup capacities
setupConfigurableCapacities();
// Also fetch minimum/maximum resource constraint for this queue if
// configured.
capacityConfigType = CapacityConfigType.NONE;
updateConfigurableResourceRequirement(getQueuePath(), clusterResource);
this.maximumAllocation =
csContext.getConfiguration().getMaximumAllocationPerQueue(
getQueuePath());
@ -364,6 +387,125 @@ private Map<String, Float> getUserWeightsFromHierarchy() throws IOException {
return unionInheritedWeights;
}
protected void updateConfigurableResourceRequirement(String queuePath,
Resource clusterResource) {
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
Set<String> configuredNodelabels = conf.getConfiguredNodeLabels(queuePath);
for (String label : configuredNodelabels) {
Resource minResource = conf.getMinimumResourceRequirement(label,
queuePath, resourceTypes);
Resource maxResource = conf.getMaximumResourceRequirement(label,
queuePath, resourceTypes);
if (this.capacityConfigType.equals(CapacityConfigType.NONE)) {
this.capacityConfigType = (!minResource.equals(Resources.none())
&& queueCapacities.getAbsoluteCapacity(label) == 0f)
? CapacityConfigType.ABSOLUTE_RESOURCE
: CapacityConfigType.PERCENTAGE;
if (LOG.isDebugEnabled()) {
LOG.debug("capacityConfigType is updated as '" + capacityConfigType
+ "' for queue '" + getQueueName());
}
}
validateAbsoluteVsPercentageCapacityConfig(minResource);
// If min resource for a resource type is greater than its max resource,
// throw exception to handle such error configs.
if (!maxResource.equals(Resources.none()) && Resources.greaterThan(
resourceCalculator, clusterResource, minResource, maxResource)) {
throw new IllegalArgumentException("Min resource configuration "
+ minResource + " is greater than its max value:" + maxResource
+ " in queue:" + getQueueName());
}
// If parent's max resource is lesser to a specific child's max
// resource, throw exception to handle such error configs.
if (parent != null) {
Resource parentMaxRes = parent.getQueueResourceQuotas()
.getConfiguredMaxResource(label);
if (Resources.greaterThan(resourceCalculator, clusterResource,
parentMaxRes, Resources.none())) {
if (Resources.greaterThan(resourceCalculator, clusterResource,
maxResource, parentMaxRes)) {
throw new IllegalArgumentException("Max resource configuration "
+ maxResource + " is greater than parents max value:"
+ parentMaxRes + " in queue:" + getQueueName());
}
// If child's max resource is not set, but its parent max resource is
// set, we must set child max resource to its parent's.
if (maxResource.equals(Resources.none())
&& !minResource.equals(Resources.none())) {
maxResource = Resources.clone(parentMaxRes);
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating absolute resource configuration for queue:"
+ getQueueName() + " as minResource=" + minResource
+ " and maxResource=" + maxResource);
}
queueResourceQuotas.setConfiguredMinResource(label, minResource);
queueResourceQuotas.setConfiguredMaxResource(label, maxResource);
}
}
private void validateAbsoluteVsPercentageCapacityConfig(
Resource minResource) {
CapacityConfigType localType = CapacityConfigType.PERCENTAGE;
if (!minResource.equals(Resources.none())) {
localType = CapacityConfigType.ABSOLUTE_RESOURCE;
}
if (!queueName.equals("root")
&& !this.capacityConfigType.equals(localType)) {
throw new IllegalArgumentException("Queue '" + getQueueName()
+ "' should use either percentage based capacity"
+ " configuration or absolute resource.");
}
}
@Override
public CapacityConfigType getCapacityConfigType() {
return capacityConfigType;
}
@Override
public Resource getEffectiveCapacity(String label) {
return Resources
.clone(getQueueResourceQuotas().getEffectiveMinResource(label));
}
@Override
public Resource getEffectiveCapacityUp(String label) {
return Resources
.clone(getQueueResourceQuotas().getEffectiveMinResourceUp(label));
}
@Override
public Resource getEffectiveCapacityDown(String label, Resource factor) {
return Resources.normalizeDown(resourceCalculator,
getQueueResourceQuotas().getEffectiveMinResource(label),
minimumAllocation);
}
@Override
public Resource getEffectiveMaxCapacity(String label) {
return Resources
.clone(getQueueResourceQuotas().getEffectiveMaxResource(label));
}
@Override
public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
return Resources.normalizeDown(resourceCalculator,
getQueueResourceQuotas().getEffectiveMaxResource(label),
minimumAllocation);
}
private void initializeQueueState(QueueState previousState,
QueueState configuredState, QueueState parentState) {
// verify that we can not any value for State other than RUNNING/STOPPED
@ -554,6 +696,11 @@ public ResourceUsage getQueueResourceUsage() {
return queueUsage;
}
@Override
public QueueResourceQuotas getQueueResourceQuotas() {
return queueResourceQuotas;
}
@Override
public ReentrantReadWriteLock.ReadLock getReadLock() {
return readLock;
@ -604,7 +751,7 @@ private Resource getCurrentLimitResource(String nodePartition,
* limit-set-by-parent)
*/
Resource queueMaxResource =
getQueueMaxResource(nodePartition, clusterResource);
getQueueMaxResource(nodePartition);
return Resources.min(resourceCalculator, clusterResource,
queueMaxResource, currentResourceLimits.getLimit());
@ -617,11 +764,8 @@ private Resource getCurrentLimitResource(String nodePartition,
return Resources.none();
}
Resource getQueueMaxResource(String nodePartition, Resource clusterResource) {
return Resources.multiplyAndNormalizeDown(resourceCalculator,
labelManager.getResourceByLabel(nodePartition, clusterResource),
queueCapacities.getAbsoluteMaximumCapacity(nodePartition),
minimumAllocation);
Resource getQueueMaxResource(String nodePartition) {
return getEffectiveMaxCapacity(nodePartition);
}
public boolean hasChildQueues() {
@ -782,7 +926,7 @@ public void incUsedResource(String nodeLabel, Resource resourceToInc,
queueUsage.incUsed(nodeLabel, resourceToInc);
CSQueueUtils.updateUsedCapacity(resourceCalculator,
labelManager.getResourceByLabel(nodeLabel, Resources.none()),
nodeLabel, this);
Resources.none(), nodeLabel, this);
if (null != parent) {
parent.incUsedResource(nodeLabel, resourceToInc, null);
}
@ -798,7 +942,7 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec,
queueUsage.decUsed(nodeLabel, resourceToDec);
CSQueueUtils.updateUsedCapacity(resourceCalculator,
labelManager.getResourceByLabel(nodeLabel, Resources.none()),
nodeLabel, this);
Resources.none(), nodeLabel, this);
if (null != parent) {
parent.decUsedResource(nodeLabel, resourceToDec, null);
}
@ -904,7 +1048,7 @@ public boolean accept(Resource cluster,
Resource maxResourceLimit;
if (allocation.getSchedulingMode()
== SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
maxResourceLimit = getQueueMaxResource(partition, cluster);
maxResourceLimit = getQueueMaxResource(partition);
} else{
maxResourceLimit = labelManager.getResourceByLabel(
schedulerContainer.getNodePartition(), cluster);

View File

@ -41,10 +41,12 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -355,4 +357,41 @@ public void validateSubmitApplication(ApplicationId applicationId,
* @return map of usernames and corresponding weight
*/
Map<String, Float> getUserWeights();
/**
* Get QueueResourceQuotas associated with each queue.
* @return QueueResourceQuotas
*/
public QueueResourceQuotas getQueueResourceQuotas();
/**
* Get CapacityConfigType as PERCENTAGE or ABSOLUTE_RESOURCE
* @return CapacityConfigType
*/
public CapacityConfigType getCapacityConfigType();
/**
* Get effective capacity of queue. If min/max resource is configured,
* preference will be given to absolute configuration over normal capacity.
* Also round down the result to normalizeDown.
*
* @param label
* partition
* @return effective queue capacity
*/
Resource getEffectiveCapacity(String label);
Resource getEffectiveCapacityUp(String label);
Resource getEffectiveCapacityDown(String label, Resource factor);
/**
* Get effective max capacity of queue. If min/max resource is configured,
* preference will be given to absolute configuration over normal capacity.
* Also round down the result to normalizeDown.
*
* @param label
* partition
* @return effective max queue capacity
*/
Resource getEffectiveMaxCapacity(String label);
Resource getEffectiveMaxCapacityDown(String label, Resource factor);
}

View File

@ -180,8 +180,8 @@ private static void updateAbsoluteCapacitiesByNodeLabels(
* used resource for all partitions of this queue.
*/
public static void updateUsedCapacity(final ResourceCalculator rc,
final Resource totalPartitionResource, String nodePartition,
AbstractCSQueue childQueue) {
final Resource totalPartitionResource, Resource clusterResource,
String nodePartition, AbstractCSQueue childQueue) {
QueueCapacities queueCapacities = childQueue.getQueueCapacities();
CSQueueMetrics queueMetrics = childQueue.getMetrics();
ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
@ -193,11 +193,8 @@ public static void updateUsedCapacity(final ResourceCalculator rc,
if (Resources.greaterThan(rc, totalPartitionResource,
totalPartitionResource, Resources.none())) {
// queueGuaranteed = totalPartitionedResource *
// absolute_capacity(partition)
Resource queueGuranteedResource =
Resources.multiply(totalPartitionResource,
queueCapacities.getAbsoluteCapacity(nodePartition));
Resource queueGuranteedResource = childQueue
.getEffectiveCapacity(nodePartition);
// make queueGuranteed >= minimum_allocation to avoid divided by 0.
queueGuranteedResource =
@ -248,9 +245,7 @@ private static Resource getMaxAvailableResourceToQueue(
for (String partition : nodeLabels) {
// Calculate guaranteed resource for a label in a queue by below logic.
// (total label resource) * (absolute capacity of label in that queue)
Resource queueGuranteedResource = Resources.multiply(nlm
.getResourceByLabel(partition, cluster), queue.getQueueCapacities()
.getAbsoluteCapacity(partition));
Resource queueGuranteedResource = queue.getEffectiveCapacity(partition);
// Available resource in queue for a specific label will be calculated as
// {(guaranteed resource for a label in a queue) -
@ -289,15 +284,14 @@ public static void updateQueueStatistics(
ResourceUsage queueResourceUsage = childQueue.getQueueResourceUsage();
if (nodePartition == null) {
for (String partition : Sets.union(
queueCapacities.getNodePartitionsSet(),
for (String partition : Sets.union(queueCapacities.getNodePartitionsSet(),
queueResourceUsage.getNodePartitionsSet())) {
updateUsedCapacity(rc, nlm.getResourceByLabel(partition, cluster),
partition, childQueue);
cluster, partition, childQueue);
}
} else {
updateUsedCapacity(rc, nlm.getResourceByLabel(nodePartition, cluster),
nodePartition, childQueue);
cluster, nodePartition, childQueue);
}
// Update queue metrics w.r.t node labels. In a generic way, we can

View File

@ -47,6 +47,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -60,6 +61,8 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.Set;
import java.util.StringTokenizer;
@ -317,6 +320,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final int DEFAULT_MAX_ASSIGN_PER_HEARTBEAT = -1;
/** Configuring absolute min/max resources in a queue **/
@Private
public static final String MINIMUM_RESOURCE = "min-resource";
@Private
public static final String MAXIMUM_RESOURCE = "max-resource";
public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores";
public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "\\[([^\\]]+)";
public enum AbsoluteResourceType {
MEMORY, VCORES;
}
AppPriorityACLConfigurationParser priorityACLConfig = new AppPriorityACLConfigurationParser();
public CapacitySchedulerConfiguration() {
@ -394,7 +412,7 @@ public void setMaximumApplicationMasterResourcePerQueuePercent(String queue,
public float getNonLabeledQueueCapacity(String queue) {
float capacity = queue.equals("root") ? 100.0f : getFloat(
getQueuePrefix(queue) + CAPACITY, UNDEFINED);
getQueuePrefix(queue) + CAPACITY, 0f);
if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
throw new IllegalArgumentException("Illegal " +
"capacity of " + capacity + " for queue " + queue);
@ -1676,4 +1694,163 @@ public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
queuePath);
setMaximumCapacity(leafQueueConfPrefix, val);
}
public static String getUnits(String resourceValue) {
String units;
for (int i = 0; i < resourceValue.length(); i++) {
if (Character.isAlphabetic(resourceValue.charAt(i))) {
units = resourceValue.substring(i);
if (StringUtils.isAlpha(units)) {
return units;
}
}
}
return "";
}
/**
* Get absolute minimum resource requirement for a queue.
*
* @param label
* NodeLabel
* @param queue
* queue path
* @param resourceTypes
* Resource types
* @return ResourceInformation
*/
public Resource getMinimumResourceRequirement(String label, String queue,
Set<String> resourceTypes) {
return internalGetLabeledResourceRequirementForQueue(queue, label,
resourceTypes, MINIMUM_RESOURCE);
}
/**
* Get absolute maximum resource requirement for a queue.
*
* @param label
* NodeLabel
* @param queue
* queue path
* @param resourceTypes
* Resource types
* @return Resource
*/
public Resource getMaximumResourceRequirement(String label, String queue,
Set<String> resourceTypes) {
return internalGetLabeledResourceRequirementForQueue(queue, label,
resourceTypes, MAXIMUM_RESOURCE);
}
@VisibleForTesting
public void setMinimumResourceRequirement(String label, String queue,
Resource resource) {
updateMinMaxResourceToConf(label, queue, resource, MINIMUM_RESOURCE);
}
@VisibleForTesting
public void setMaximumResourceRequirement(String label, String queue,
Resource resource) {
updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_RESOURCE);
}
private void updateMinMaxResourceToConf(String label, String queue,
Resource resource, String type) {
if (queue.equals("root")) {
throw new IllegalArgumentException(
"Cannot set resource, root queue will take 100% of cluster capacity");
}
StringBuilder resourceString = new StringBuilder();
resourceString
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
+ resource.getMemorySize() + ","
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
+ resource.getVirtualCores() + "]");
String prefix = getQueuePrefix(queue) + type;
if (!label.isEmpty()) {
prefix = getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label
+ DOT + type;
}
set(prefix, resourceString.toString());
}
private Resource internalGetLabeledResourceRequirementForQueue(String queue,
String label, Set<String> resourceTypes, String suffix) {
String propertyName = getNodeLabelPrefix(queue, label) + suffix;
String resourceString = get(propertyName);
if (resourceString == null || resourceString.isEmpty()) {
return Resources.none();
}
// Define resource here.
Resource resource = Resource.newInstance(0l, 0);
Matcher matcher = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE)
.matcher(resourceString);
/*
* Absolute resource configuration for a queue will be grouped by "[]".
* Syntax of absolute resource config could be like below
* "memory=4Gi vcores=2". Ideally this means "4GB of memory and 2 vcores".
*/
if (matcher.find()) {
// Get the sub-group.
String subGroup = matcher.group(1);
if (subGroup.trim().isEmpty()) {
return Resources.none();
}
for (String kvPair : subGroup.trim().split(",")) {
String[] splits = kvPair.split("=");
// Ensure that each sub string is key value pair separated by '='.
if (splits != null && splits.length > 1) {
updateResourceValuesFromConfig(resourceTypes, resource, splits);
}
}
}
// Memory has to be configured always.
if (resource.getMemorySize() == 0l) {
return Resources.none();
}
if (LOG.isDebugEnabled()) {
LOG.debug("CSConf - getAbsolueResourcePerQueue: prefix="
+ getNodeLabelPrefix(queue, label) + ", capacity=" + resource);
}
return resource;
}
private void updateResourceValuesFromConfig(Set<String> resourceTypes,
Resource resource, String[] splits) {
// If key is not a valid type, skip it.
if (!resourceTypes.contains(splits[0])) {
return;
}
String units = getUnits(splits[1]);
Long resourceValue = Long
.valueOf(splits[1].substring(0, splits[1].length() - units.length()));
// Convert all incoming units to MB if units is configured.
if (!units.isEmpty()) {
resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
}
// map it based on key.
AbsoluteResourceType resType = AbsoluteResourceType
.valueOf(StringUtils.toUpperCase(splits[0].trim()));
switch (resType) {
case MEMORY :
resource.setMemorySize(resourceValue);
break;
case VCORES :
resource.setVirtualCores(resourceValue.intValue());
break;
default :
break;
}
}
}

View File

@ -671,12 +671,7 @@ public Resource getUserAMResourceLimitPerPartition(
1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
Resource queuePartitionResource = Resources
.multiplyAndNormalizeUp(resourceCalculator,
labelManager.getResourceByLabel(nodePartition,
lastClusterResource),
queueCapacities.getAbsoluteCapacity(nodePartition),
minimumAllocation);
Resource queuePartitionResource = getEffectiveCapacityUp(nodePartition);
Resource userAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
@ -705,11 +700,7 @@ public Resource calculateAndGetAMResourceLimitPerPartition(
* non-labeled), * with per-partition am-resource-percent to get the max am
* resource limit for this queue and partition.
*/
Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
resourceCalculator,
labelManager.getResourceByLabel(nodePartition, lastClusterResource),
queueCapacities.getAbsoluteCapacity(nodePartition),
minimumAllocation);
Resource queuePartitionResource = getEffectiveCapacityUp(nodePartition);
Resource queueCurrentLimit = Resources.none();
// For non-labeled partition, we need to consider the current queue
@ -965,6 +956,14 @@ private FiCaSchedulerApp getApplication(
private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
// Set preemption-allowed:
// For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
if (!queueResourceQuotas.getEffectiveMinResource(nodePartition)
.equals(Resources.none())) {
limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator,
csContext.getClusterResource(), queueUsage.getUsed(nodePartition),
queueResourceQuotas.getEffectiveMinResource(nodePartition)));
return;
}
float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
@ -1344,7 +1343,7 @@ private Resource getHeadroom(User user,
currentPartitionResourceLimit =
partition.equals(RMNodeLabelsManager.NO_LABEL)
? currentPartitionResourceLimit
: getQueueMaxResource(partition, clusterResource);
: getQueueMaxResource(partition);
Resource headroom = Resources.componentwiseMin(
Resources.subtract(userLimitResource, user.getUsed(partition)),
@ -1716,12 +1715,8 @@ private void updateCurrentResourceLimits(
// this. So need cap limits by queue's max capacity here.
this.cachedResourceLimitsForHeadroom =
new ResourceLimits(currentResourceLimits.getLimit());
Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
queueCapacities
.getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
minimumAllocation);
Resource queueMaxResource = getEffectiveMaxCapacityDown(
RMNodeLabelsManager.NO_LABEL, minimumAllocation);
this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
resourceCalculator, clusterResource, queueMaxResource,
currentResourceLimits.getLimit()));

View File

@ -25,6 +25,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@ -67,6 +68,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@Private
@ -162,31 +164,78 @@ void setChildQueues(Collection<CSQueue> childQueues) {
writeLock.lock();
// Validate
float childCapacities = 0;
Resource minResDefaultLabel = Resources.createResource(0, 0);
for (CSQueue queue : childQueues) {
childCapacities += queue.getCapacity();
Resources.addTo(minResDefaultLabel, queue.getQueueResourceQuotas()
.getConfiguredMinResource());
// If any child queue is using percentage based capacity model vs parent
// queues' absolute configuration or vice versa, throw back an
// exception.
if (!queueName.equals("root") && getCapacity() != 0f
&& !queue.getQueueResourceQuotas().getConfiguredMinResource()
.equals(Resources.none())) {
throw new IllegalArgumentException("Parent queue '" + getQueueName()
+ "' and child queue '" + queue.getQueueName()
+ "' should use either percentage based capacity"
+ " configuration or absolute resource together.");
}
}
float delta = Math.abs(1.0f - childCapacities); // crude way to check
// allow capacities being set to 0, and enforce child 0 if parent is 0
if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) || (
(queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
throw new IllegalArgumentException(
"Illegal" + " capacity of " + childCapacities
+ " for children of queue " + queueName);
if ((minResDefaultLabel.equals(Resources.none())
&& (queueCapacities.getCapacity() > 0) && (delta > PRECISION))
|| ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
throw new IllegalArgumentException("Illegal" + " capacity of "
+ childCapacities + " for children of queue " + queueName);
}
// check label capacities
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
float capacityByLabel = queueCapacities.getCapacity(nodeLabel);
// check children's labels
float sum = 0;
Resource minRes = Resources.createResource(0, 0);
Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel,
scheduler.getClusterResource());
for (CSQueue queue : childQueues) {
sum += queue.getQueueCapacities().getCapacity(nodeLabel);
// If any child queue of a label is using percentage based capacity
// model vs parent queues' absolute configuration or vice versa, throw
// back an exception
if (!queueName.equals("root") && !this.capacityConfigType
.equals(queue.getCapacityConfigType())) {
throw new IllegalArgumentException("Parent queue '" + getQueueName()
+ "' and child queue '" + queue.getQueueName()
+ "' should use either percentage based capacity"
+ "configuration or absolute resource together for label:"
+ nodeLabel);
}
// Accumulate all min/max resource configured for all child queues.
Resources.addTo(minRes, queue.getQueueResourceQuotas()
.getConfiguredMinResource(nodeLabel));
}
if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
if ((minResDefaultLabel.equals(Resources.none()) && capacityByLabel > 0
&& Math.abs(1.0f - sum) > PRECISION)
|| (capacityByLabel == 0) && (sum > 0)) {
throw new IllegalArgumentException(
"Illegal" + " capacity of " + sum + " for children of queue "
+ queueName + " for label=" + nodeLabel);
}
// Ensure that for each parent queue: parent.min-resource >=
// Σ(child.min-resource).
Resource parentMinResource = queueResourceQuotas
.getConfiguredMinResource(nodeLabel);
if (!parentMinResource.equals(Resources.none()) && Resources.lessThan(
resourceCalculator, resourceByLabel, parentMinResource, minRes)) {
throw new IllegalArgumentException("Parent Queues" + " capacity: "
+ parentMinResource + " is less than" + " to its children:"
+ minRes + " for queue:" + queueName);
}
}
this.childQueues.clear();
@ -687,11 +736,8 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child,
child.getQueueResourceUsage().getUsed(nodePartition));
// Get child's max resource
Resource childConfiguredMaxResource = Resources.multiplyAndNormalizeDown(
resourceCalculator,
labelManager.getResourceByLabel(nodePartition, clusterResource),
child.getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition),
minimumAllocation);
Resource childConfiguredMaxResource = getEffectiveMaxCapacityDown(
nodePartition, minimumAllocation);
// Child's limit should be capped by child configured max resource
childLimit =
@ -827,6 +873,14 @@ public void updateClusterResource(Resource clusterResource,
ResourceLimits resourceLimits) {
try {
writeLock.lock();
// Update effective capacity in all parent queue.
Set<String> configuredNodelabels = csContext.getConfiguration()
.getConfiguredNodeLabels(getQueuePath());
for (String label : configuredNodelabels) {
calculateEffectiveResourcesAndCapacity(label, clusterResource);
}
// Update all children
for (CSQueue childQueue : childQueues) {
// Get ResourceLimits of child queue before assign containers
@ -848,6 +902,110 @@ public boolean hasChildQueues() {
return true;
}
private void calculateEffectiveResourcesAndCapacity(String label,
Resource clusterResource) {
// For root queue, ensure that max/min resource is updated to latest
// cluster resource.
Resource resourceByLabel = labelManager.getResourceByLabel(label,
clusterResource);
if (getQueueName().equals("root")) {
queueResourceQuotas.setConfiguredMinResource(label, resourceByLabel);
queueResourceQuotas.setConfiguredMaxResource(label, resourceByLabel);
queueResourceQuotas.setEffectiveMinResource(label, resourceByLabel);
queueResourceQuotas.setEffectiveMaxResource(label, resourceByLabel);
}
// Total configured min resources of direct children of queue
Resource configuredMinResources = Resource.newInstance(0L, 0);
for (CSQueue childQueue : getChildQueues()) {
Resources.addTo(configuredMinResources,
childQueue.getQueueResourceQuotas().getConfiguredMinResource(label));
}
// Factor to scale down effective resource: When cluster has sufficient
// resources, effective_min_resources will be same as configured
// min_resources.
float effectiveMinRatio = 1;
ResourceCalculator rc = this.csContext.getResourceCalculator();
if (getQueueName().equals("root")) {
if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc,
clusterResource, resourceByLabel, configuredMinResources)) {
effectiveMinRatio = Resources.divide(rc, clusterResource,
resourceByLabel, configuredMinResources);
}
} else {
if (Resources.lessThan(rc, clusterResource,
queueResourceQuotas.getEffectiveMinResource(label),
configuredMinResources)) {
effectiveMinRatio = Resources.divide(rc, clusterResource,
queueResourceQuotas.getEffectiveMinResource(label),
configuredMinResources);
}
}
// loop and do this for all child queues
for (CSQueue childQueue : getChildQueues()) {
Resource minResource = childQueue.getQueueResourceQuotas()
.getConfiguredMinResource(label);
// Update effective resource (min/max) to each child queue.
if (childQueue.getCapacityConfigType()
.equals(CapacityConfigType.ABSOLUTE_RESOURCE)) {
childQueue.getQueueResourceQuotas().setEffectiveMinResource(label,
Resources.multiply(minResource, effectiveMinRatio));
// Max resource of a queue should be a minimum of {configuredMaxRes,
// parentMaxRes}. parentMaxRes could be configured value. But if not
// present could also be taken from effective max resource of parent.
Resource parentMaxRes = queueResourceQuotas
.getConfiguredMaxResource(label);
if (parentMaxRes.equals(Resources.none())) {
parentMaxRes = parent.getQueueResourceQuotas()
.getEffectiveMaxResource(label);
}
// Minimum of {childMaxResource, parentMaxRes}. However if
// childMaxResource is empty, consider parent's max resource alone.
Resource childMaxResource = childQueue.getQueueResourceQuotas()
.getConfiguredMaxResource(label);
Resource effMaxResource = Resources.min(resourceCalculator,
resourceByLabel, childMaxResource.equals(Resources.none())
? parentMaxRes
: childMaxResource,
parentMaxRes);
childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label,
Resources.clone(effMaxResource));
} else {
childQueue.getQueueResourceQuotas().setEffectiveMinResource(label,
Resources.multiply(resourceByLabel,
childQueue.getQueueCapacities().getAbsoluteCapacity(label)));
childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label,
Resources.multiply(resourceByLabel, childQueue.getQueueCapacities()
.getAbsoluteMaximumCapacity(label)));
childQueue.getQueueResourceQuotas().setEffectiveMinResourceUp(label,
Resources.multiplyAndNormalizeUp(rc, resourceByLabel,
childQueue.getQueueCapacities().getAbsoluteCapacity(label),
minimumAllocation));
childQueue.getQueueResourceQuotas().setEffectiveMaxResourceUp(label,
Resources.multiplyAndNormalizeUp(rc,
resourceByLabel, childQueue.getQueueCapacities()
.getAbsoluteMaximumCapacity(label),
minimumAllocation));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating effective min resource for queue:"
+ childQueue.getQueueName() + " as effMinResource="
+ childQueue.getQueueResourceQuotas().getEffectiveMinResource(label)
+ "and Updating effective max resource as effMaxResource="
+ childQueue.getQueueResourceQuotas()
.getEffectiveMaxResource(label));
}
}
}
@Override
public List<CSQueue> getChildQueues() {
try {
@ -980,9 +1138,21 @@ void allocateResource(Resource clusterResource,
* When this happens, we have to preempt killable container (on same or different
* nodes) of parent queue to avoid violating parent's max resource.
*/
if (getQueueCapacities().getAbsoluteMaximumCapacity(nodePartition)
< getQueueCapacities().getAbsoluteUsedCapacity(nodePartition)) {
killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource);
if (!queueResourceQuotas.getEffectiveMaxResource(nodePartition)
.equals(Resources.none())) {
if (Resources.lessThan(resourceCalculator, clusterResource,
queueResourceQuotas.getEffectiveMaxResource(nodePartition),
queueUsage.getUsed(nodePartition))) {
killContainersToEnforceMaxQueueCapacity(nodePartition,
clusterResource);
}
} else {
if (getQueueCapacities()
.getAbsoluteMaximumCapacity(nodePartition) < getQueueCapacities()
.getAbsoluteUsedCapacity(nodePartition)) {
killContainersToEnforceMaxQueueCapacity(nodePartition,
clusterResource);
}
}
} finally {
writeLock.unlock();
@ -999,8 +1169,7 @@ private void killContainersToEnforceMaxQueueCapacity(String partition,
Resource partitionResource = labelManager.getResourceByLabel(partition,
null);
Resource maxResource = Resources.multiply(partitionResource,
getQueueCapacities().getAbsoluteMaximumCapacity(partition));
Resource maxResource = getEffectiveMaxCapacity(partition);
while (Resources.greaterThan(resourceCalculator, partitionResource,
queueUsage.getUsed(partition), maxResource)) {

View File

@ -686,10 +686,7 @@ private Resource computeUserLimit(String userName, Resource clusterResource,
* * If we're running over capacity, then its (usedResources + required)
* (which extra resources we are allocating)
*/
Resource queueCapacity = Resources.multiplyAndNormalizeUp(
resourceCalculator, partitionResource,
lQueue.getQueueCapacities().getAbsoluteCapacity(nodePartition),
lQueue.getMinimumAllocation());
Resource queueCapacity = lQueue.getEffectiveCapacityUp(nodePartition);
/*
* Assume we have required resource equals to minimumAllocation, this can

View File

@ -20,9 +20,11 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collections;
@ -121,6 +123,15 @@ public int compare(CSQueue q1, CSQueue q2) {
// For queue with same used ratio / priority, queue with higher configured
// capacity goes first
if (0 == rc) {
Resource minEffRes1 = q1.getQueueResourceQuotas()
.getConfiguredMinResource(p);
Resource minEffRes2 = q2.getQueueResourceQuotas()
.getConfiguredMinResource(p);
if (!minEffRes1.equals(Resources.none())
&& !minEffRes2.equals(Resources.none())) {
return minEffRes2.compareTo(minEffRes1);
}
float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p);
float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p);
return Float.compare(abs2, abs1);

View File

@ -62,6 +62,8 @@ public class CapacitySchedulerQueueInfo {
protected long pendingContainers;
protected QueueCapacitiesInfo capacities;
protected ResourcesInfo resources;
protected ResourceInfo minEffectiveCapacity;
protected ResourceInfo maxEffectiveCapacity;
CapacitySchedulerQueueInfo() {
};
@ -105,6 +107,11 @@ public class CapacitySchedulerQueueInfo {
ResourceUsage queueResourceUsage = q.getQueueResourceUsage();
populateQueueResourceUsage(queueResourceUsage);
minEffectiveCapacity = new ResourceInfo(
q.getQueueResourceQuotas().getEffectiveMinResource());
maxEffectiveCapacity = new ResourceInfo(
q.getQueueResourceQuotas().getEffectiveMaxResource());
}
protected void populateQueueResourceUsage(ResourceUsage queueResourceUsage) {
@ -200,4 +207,12 @@ public QueueCapacitiesInfo getCapacities() {
public ResourcesInfo getResources() {
return resources;
}
public ResourceInfo getMinEffectiveCapacity(){
return minEffectiveCapacity;
}
public ResourceInfo getMaxEffectiveCapacity(){
return maxEffectiveCapacity;
}
}

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@ -142,6 +143,13 @@ public Map<ApplicationId, AppCollectorData> getRegisteringCollectors() {
return this.registeringCollectors;
}
public void unRegisterNode() throws Exception {
UnRegisterNodeManagerRequest request = Records
.newRecord(UnRegisterNodeManagerRequest.class);
request.setNodeId(nodeId);
resourceTracker.unRegisterNodeManager(request);
}
public RegisterNodeManagerResponse registerNode() throws Exception {
return registerNode(null, null);
}

View File

@ -823,6 +823,12 @@ PrivilegedExceptionAction<SubmitApplicationResponse> setClientReq(
return rmApp;
}
public MockNM unRegisterNode(MockNM nm) throws Exception {
nm.unRegisterNode();
drainEventsImplicitly();
return nm;
}
public MockNM registerNode(String nodeIdStr, int memory) throws Exception {
MockNM nm = new MockNM(nodeIdStr, memory, getResourceTrackerService());
nm.registerNode();

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
@ -644,9 +645,11 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
QueueCapacities qc = new QueueCapacities(0 == myLevel);
ResourceUsage ru = new ResourceUsage();
QueueResourceQuotas qr = new QueueResourceQuotas();
when(queue.getQueueCapacities()).thenReturn(qc);
when(queue.getQueueResourceUsage()).thenReturn(ru);
when(queue.getQueueResourceQuotas()).thenReturn(qr);
LOG.debug("Setup queue, name=" + queue.getQueueName() + " path="
+ queue.getQueuePath());
@ -679,7 +682,17 @@ private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
qc.setAbsoluteMaximumCapacity(partitionName, absMax);
qc.setAbsoluteUsedCapacity(partitionName, absUsed);
qc.setUsedCapacity(partitionName, used);
qr.setEffectiveMaxResource(parseResourceFromString(values[1].trim()));
qr.setEffectiveMinResource(parseResourceFromString(values[0].trim()));
qr.setEffectiveMaxResource(partitionName,
parseResourceFromString(values[1].trim()));
qr.setEffectiveMinResource(partitionName,
parseResourceFromString(values[0].trim()));
when(queue.getUsedCapacity()).thenReturn(used);
when(queue.getEffectiveCapacity(partitionName))
.thenReturn(parseResourceFromString(values[0].trim()));
when(queue.getEffectiveMaxCapacity(partitionName))
.thenReturn(parseResourceFromString(values[1].trim()));
ru.setPending(partitionName, pending);
// Setup reserved resource if it contained by input config
Resource reserved = Resources.none();

View File

@ -36,6 +36,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
@ -437,8 +438,8 @@ public void testPerQueueDisablePreemptionBroadHierarchical() {
policy.editSchedule();
// queueF(appD) wants resources, Verify that resources come from queueE(appC)
// because it's a sibling and queueB(appA) because queueA is over capacity.
verify(mDisp, times(28)).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(22)).handle(argThat(new IsPreemptionRequestFor(appC)));
verify(mDisp, times(27)).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(23)).handle(argThat(new IsPreemptionRequestFor(appC)));
// Need to call setup() again to reset mDisp
setup();
@ -1204,6 +1205,17 @@ ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used,
when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
boolean preemptionDisabled = mockPreemptionStatus("root");
when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled);
QueueResourceQuotas rootQr = new QueueResourceQuotas();
rootQr.setEffectiveMaxResource(Resource.newInstance(maxCap[0], 0));
rootQr.setEffectiveMinResource(abs[0]);
rootQr.setEffectiveMaxResource(RMNodeLabelsManager.NO_LABEL,
Resource.newInstance(maxCap[0], 0));
rootQr.setEffectiveMinResource(RMNodeLabelsManager.NO_LABEL, abs[0]);
when(root.getQueueResourceQuotas()).thenReturn(rootQr);
when(root.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL))
.thenReturn(abs[0]);
when(root.getEffectiveMaxCapacity(RMNodeLabelsManager.NO_LABEL))
.thenReturn(Resource.newInstance(maxCap[0], 0));
for (int i = 1; i < queues.length; ++i) {
final CSQueue q;
@ -1234,6 +1246,18 @@ ParentQueue mockNested(Resource[] abs, int[] maxCap, Resource[] used,
qc.setAbsoluteMaximumCapacity(maxCap[i] / (float) tot.getMemorySize());
when(q.getQueueCapacities()).thenReturn(qc);
QueueResourceQuotas qr = new QueueResourceQuotas();
qr.setEffectiveMaxResource(Resource.newInstance(maxCap[i], 0));
qr.setEffectiveMinResource(abs[i]);
qr.setEffectiveMaxResource(RMNodeLabelsManager.NO_LABEL,
Resource.newInstance(maxCap[i], 0));
qr.setEffectiveMinResource(RMNodeLabelsManager.NO_LABEL, abs[i]);
when(q.getQueueResourceQuotas()).thenReturn(qr);
when(q.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL))
.thenReturn(abs[i]);
when(q.getEffectiveMaxCapacity(RMNodeLabelsManager.NO_LABEL))
.thenReturn(Resource.newInstance(maxCap[i], 0));
String parentPathName = p.getQueuePath();
parentPathName = (parentPathName == null) ? "root" : parentPathName;
String queuePathName = (parentPathName + "." + queueName).replace("/",

View File

@ -67,9 +67,9 @@ public void testSimpleIntraQueuePreemptionWithVCoreResource()
conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
"priority_first");
String labelsConfig = "=100:200,true;";
String labelsConfig = "=100:50,true;";
String nodesConfig = // n1 has no label
"n1= res=100:200";
"n1= res=100:50";
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100:50 100:50 80:40 120:60 0]);" + // root
@ -105,7 +105,7 @@ public void testSimpleIntraQueuePreemptionWithVCoreResource()
verify(mDisp, times(1)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(4))));
verify(mDisp, times(7)).handle(argThat(
verify(mDisp, times(3)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}

View File

@ -0,0 +1,516 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Test;
public class TestAbsoluteResourceConfiguration {
private static final int GB = 1024;
private static final String QUEUEA = "queueA";
private static final String QUEUEB = "queueB";
private static final String QUEUEC = "queueC";
private static final String QUEUEA1 = "queueA1";
private static final String QUEUEA2 = "queueA2";
private static final String QUEUEB1 = "queueB1";
private static final String QUEUEA_FULL = CapacitySchedulerConfiguration.ROOT
+ "." + QUEUEA;
private static final String QUEUEB_FULL = CapacitySchedulerConfiguration.ROOT
+ "." + QUEUEB;
private static final String QUEUEC_FULL = CapacitySchedulerConfiguration.ROOT
+ "." + QUEUEC;
private static final String QUEUEA1_FULL = QUEUEA_FULL + "." + QUEUEA1;
private static final String QUEUEA2_FULL = QUEUEA_FULL + "." + QUEUEA2;
private static final String QUEUEB1_FULL = QUEUEB_FULL + "." + QUEUEB1;
private static final Resource QUEUE_A_MINRES = Resource.newInstance(100 * GB,
10);
private static final Resource QUEUE_A_MAXRES = Resource.newInstance(200 * GB,
30);
private static final Resource QUEUE_A1_MINRES = Resource.newInstance(50 * GB,
5);
private static final Resource QUEUE_A2_MINRES = Resource.newInstance(50 * GB,
5);
private static final Resource QUEUE_B_MINRES = Resource.newInstance(50 * GB,
10);
private static final Resource QUEUE_B1_MINRES = Resource.newInstance(40 * GB,
10);
private static final Resource QUEUE_B_MAXRES = Resource.newInstance(150 * GB,
30);
private static final Resource QUEUE_C_MINRES = Resource.newInstance(50 * GB,
10);
private static final Resource QUEUE_C_MAXRES = Resource.newInstance(150 * GB,
20);
private static final Resource QUEUEA_REDUCED = Resource.newInstance(64000, 6);
private static final Resource QUEUEB_REDUCED = Resource.newInstance(32000, 6);
private static final Resource QUEUEC_REDUCED = Resource.newInstance(32000, 6);
private static final Resource QUEUEMAX_REDUCED = Resource.newInstance(128000,
20);
private static Set<String> resourceTypes = new HashSet<>(
Arrays.asList("memory", "vcores"));
private CapacitySchedulerConfiguration setupSimpleQueueConfiguration(
boolean isCapacityNeeded) {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{QUEUEA, QUEUEB, QUEUEC});
// Set default capacities like normal configuration.
if (isCapacityNeeded) {
csConf.setCapacity(QUEUEA_FULL, 50f);
csConf.setCapacity(QUEUEB_FULL, 25f);
csConf.setCapacity(QUEUEC_FULL, 25f);
}
return csConf;
}
private CapacitySchedulerConfiguration setupComplexQueueConfiguration(
boolean isCapacityNeeded) {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{QUEUEA, QUEUEB, QUEUEC});
csConf.setQueues(QUEUEA_FULL, new String[]{QUEUEA1, QUEUEA2});
csConf.setQueues(QUEUEB_FULL, new String[]{QUEUEB1});
// Set default capacities like normal configuration.
if (isCapacityNeeded) {
csConf.setCapacity(QUEUEA_FULL, 50f);
csConf.setCapacity(QUEUEB_FULL, 25f);
csConf.setCapacity(QUEUEC_FULL, 25f);
csConf.setCapacity(QUEUEA1_FULL, 50f);
csConf.setCapacity(QUEUEA2_FULL, 50f);
csConf.setCapacity(QUEUEB1_FULL, 100f);
}
return csConf;
}
private CapacitySchedulerConfiguration setupMinMaxResourceConfiguration(
CapacitySchedulerConfiguration csConf) {
// Update min/max resource to queueA/B/C
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MINRES);
csConf.setMaximumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MAXRES);
return csConf;
}
private CapacitySchedulerConfiguration setupComplexMinMaxResourceConfig(
CapacitySchedulerConfiguration csConf) {
// Update min/max resource to queueA/B/C
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEA1_FULL, QUEUE_A1_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEA2_FULL, QUEUE_A2_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEB1_FULL, QUEUE_B1_MINRES);
csConf.setMaximumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MAXRES);
return csConf;
}
@Test
public void testSimpleMinMaxResourceConfigurartionPerQueue() {
CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration(true);
setupMinMaxResourceConfiguration(csConf);
Assert.assertEquals("Min resource configured for QUEUEA is not correct",
QUEUE_A_MINRES,
csConf.getMinimumResourceRequirement("", QUEUEA_FULL, resourceTypes));
Assert.assertEquals("Max resource configured for QUEUEA is not correct",
QUEUE_A_MAXRES,
csConf.getMaximumResourceRequirement("", QUEUEA_FULL, resourceTypes));
Assert.assertEquals("Min resource configured for QUEUEB is not correct",
QUEUE_B_MINRES,
csConf.getMinimumResourceRequirement("", QUEUEB_FULL, resourceTypes));
Assert.assertEquals("Max resource configured for QUEUEB is not correct",
QUEUE_B_MAXRES,
csConf.getMaximumResourceRequirement("", QUEUEB_FULL, resourceTypes));
Assert.assertEquals("Min resource configured for QUEUEC is not correct",
QUEUE_C_MINRES,
csConf.getMinimumResourceRequirement("", QUEUEC_FULL, resourceTypes));
Assert.assertEquals("Max resource configured for QUEUEC is not correct",
QUEUE_C_MAXRES,
csConf.getMaximumResourceRequirement("", QUEUEC_FULL, resourceTypes));
}
@Test
public void testEffectiveMinMaxResourceConfigurartionPerQueue()
throws Exception {
// create conf with basic queue configuration.
CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration(
false);
setupMinMaxResourceConfiguration(csConf);
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
@SuppressWarnings("resource")
MockRM rm = new MockRM(csConf);
rm.start();
// Add few nodes
rm.registerNode("127.0.0.1:1234", 250 * GB, 40);
// Get queue object to verify min/max resource configuration.
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue qA = (LeafQueue) cs.getQueue(QUEUEA);
Assert.assertNotNull(qA);
Assert.assertEquals("Min resource configured for QUEUEA is not correct",
QUEUE_A_MINRES, qA.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals("Max resource configured for QUEUEA is not correct",
QUEUE_A_MAXRES, qA.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals("Effective Min resource for QUEUEA is not correct",
QUEUE_A_MINRES, qA.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEA is not correct",
QUEUE_A_MAXRES, qA.queueResourceQuotas.getEffectiveMaxResource());
LeafQueue qB = (LeafQueue) cs.getQueue(QUEUEB);
Assert.assertNotNull(qB);
Assert.assertEquals("Min resource configured for QUEUEB is not correct",
QUEUE_B_MINRES, qB.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals("Max resource configured for QUEUEB is not correct",
QUEUE_B_MAXRES, qB.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals("Effective Min resource for QUEUEB is not correct",
QUEUE_B_MINRES, qB.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEB is not correct",
QUEUE_B_MAXRES, qB.queueResourceQuotas.getEffectiveMaxResource());
LeafQueue qC = (LeafQueue) cs.getQueue(QUEUEC);
Assert.assertNotNull(qC);
Assert.assertEquals("Min resource configured for QUEUEC is not correct",
QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals("Max resource configured for QUEUEC is not correct",
QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals("Effective Min resource for QUEUEC is not correct",
QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEC is not correct",
QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource());
rm.stop();
}
@Test
public void testSimpleValidateAbsoluteResourceConfig() throws Exception {
/**
* Queue structure is as follows. root / | \ a b c / \ | a1 a2 b1
*
* Test below cases 1) Configure percentage based capacity and absolute
* resource together. 2) As per above tree structure, ensure all values
* could be retrieved. 3) Validate whether min resource cannot be more than
* max resources. 4) Validate whether max resource of queue cannot be more
* than its parent max resource.
*/
// create conf with basic queue configuration.
CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration(
false);
setupMinMaxResourceConfiguration(csConf);
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
@SuppressWarnings("resource")
MockRM rm = new MockRM(csConf);
rm.start();
// Add few nodes
rm.registerNode("127.0.0.1:1234", 250 * GB, 40);
// Get queue object to verify min/max resource configuration.
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
// 1. Create a new config with capcity and min/max together. Ensure an
// exception is thrown.
CapacitySchedulerConfiguration csConf1 = setupSimpleQueueConfiguration(
true);
setupMinMaxResourceConfiguration(csConf1);
try {
cs.reinitialize(csConf1, rm.getRMContext());
Assert.fail();
} catch (IOException e) {
Assert.assertTrue(e instanceof IOException);
Assert.assertEquals(
"Failed to re-init queues : Queue 'queueA' should use either"
+ " percentage based capacity configuration or absolute resource.",
e.getMessage());
}
rm.stop();
// 2. Create a new config with min/max alone with a complex queue config.
// Check all values could be fetched correctly.
CapacitySchedulerConfiguration csConf2 = setupComplexQueueConfiguration(
false);
setupComplexMinMaxResourceConfig(csConf2);
rm = new MockRM(csConf2);
rm.start();
rm.registerNode("127.0.0.1:1234", 250 * GB, 40);
cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue qA1 = (LeafQueue) cs.getQueue(QUEUEA1);
Assert.assertEquals("Effective Min resource for QUEUEA1 is not correct",
QUEUE_A1_MINRES, qA1.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEA1 is not correct",
QUEUE_A_MAXRES, qA1.queueResourceQuotas.getEffectiveMaxResource());
LeafQueue qA2 = (LeafQueue) cs.getQueue(QUEUEA2);
Assert.assertEquals("Effective Min resource for QUEUEA2 is not correct",
QUEUE_A2_MINRES, qA2.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEA2 is not correct",
QUEUE_A_MAXRES, qA2.queueResourceQuotas.getEffectiveMaxResource());
LeafQueue qB1 = (LeafQueue) cs.getQueue(QUEUEB1);
Assert.assertNotNull(qB1);
Assert.assertEquals("Min resource configured for QUEUEB1 is not correct",
QUEUE_B1_MINRES, qB1.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals("Max resource configured for QUEUEB1 is not correct",
QUEUE_B_MAXRES, qB1.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals("Effective Min resource for QUEUEB1 is not correct",
QUEUE_B1_MINRES, qB1.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEB1 is not correct",
QUEUE_B_MAXRES, qB1.queueResourceQuotas.getEffectiveMaxResource());
LeafQueue qC = (LeafQueue) cs.getQueue(QUEUEC);
Assert.assertNotNull(qC);
Assert.assertEquals("Min resource configured for QUEUEC is not correct",
QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals("Max resource configured for QUEUEC is not correct",
QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals("Effective Min resource for QUEUEC is not correct",
QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEC is not correct",
QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource());
// 3. Create a new config and make sure one queue's min resource is more
// than its max resource configured.
CapacitySchedulerConfiguration csConf3 = setupComplexQueueConfiguration(
false);
setupComplexMinMaxResourceConfig(csConf3);
csConf3.setMinimumResourceRequirement("", QUEUEB1_FULL, QUEUE_B_MAXRES);
csConf3.setMaximumResourceRequirement("", QUEUEB1_FULL, QUEUE_B1_MINRES);
try {
cs.reinitialize(csConf3, rm.getRMContext());
Assert.fail();
} catch (IOException e) {
Assert.assertTrue(e instanceof IOException);
Assert.assertEquals(
"Failed to re-init queues : Min resource configuration "
+ "<memory:153600, vCores:30> is greater than its "
+ "max value:<memory:40960, vCores:10> in queue:queueB1",
e.getMessage());
}
// 4. Create a new config and make sure one queue's max resource is more
// than its preant's max resource configured.
CapacitySchedulerConfiguration csConf4 = setupComplexQueueConfiguration(
false);
setupComplexMinMaxResourceConfig(csConf4);
csConf4.setMaximumResourceRequirement("", QUEUEB1_FULL, QUEUE_A_MAXRES);
try {
cs.reinitialize(csConf4, rm.getRMContext());
Assert.fail();
} catch (IOException e) {
Assert.assertTrue(e instanceof IOException);
Assert
.assertEquals(
"Failed to re-init queues : Max resource configuration "
+ "<memory:204800, vCores:30> is greater than parents max value:"
+ "<memory:153600, vCores:30> in queue:queueB1",
e.getMessage());
}
rm.stop();
}
@Test
public void testComplexValidateAbsoluteResourceConfig() throws Exception {
/**
* Queue structure is as follows. root / | \ a b c / \ | a1 a2 b1
*
* Test below cases: 1) Parent and its child queues must use either
* percentage based or absolute resource configuration. 2) Parent's min
* resource must be more than sum of child's min resource.
*/
// create conf with basic queue configuration.
CapacitySchedulerConfiguration csConf = setupComplexQueueConfiguration(
false);
setupComplexMinMaxResourceConfig(csConf);
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
@SuppressWarnings("resource")
MockRM rm = new MockRM(csConf);
rm.start();
// Add few nodes
rm.registerNode("127.0.0.1:1234", 250 * GB, 40);
// 1. Explicitly set percentage based config for parent queues. This will
// make Queue A,B and C with percentage based and A1,A2 or B1 with absolute
// resource.
csConf.setCapacity(QUEUEA_FULL, 50f);
csConf.setCapacity(QUEUEB_FULL, 25f);
csConf.setCapacity(QUEUEC_FULL, 25f);
// Also unset resource based config.
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, Resources.none());
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, Resources.none());
csConf.setMinimumResourceRequirement("", QUEUEC_FULL, Resources.none());
// Get queue object to verify min/max resource configuration.
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
try {
cs.reinitialize(csConf, rm.getRMContext());
Assert.fail();
} catch (IOException e) {
Assert.assertTrue(e instanceof IOException);
Assert.assertEquals(
"Failed to re-init queues : Parent queue 'queueA' "
+ "and child queue 'queueA1' should use either percentage based"
+ " capacity configuration or absolute resource together.",
e.getMessage());
}
// 2. Create a new config and make sure one queue's min resource is more
// than its max resource configured.
CapacitySchedulerConfiguration csConf1 = setupComplexQueueConfiguration(
false);
setupComplexMinMaxResourceConfig(csConf1);
// Configure QueueA with lesser resource than its children.
csConf1.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A1_MINRES);
try {
cs.reinitialize(csConf1, rm.getRMContext());
Assert.fail();
} catch (IOException e) {
Assert.assertTrue(e instanceof IOException);
Assert.assertEquals("Failed to re-init queues : Parent Queues capacity: "
+ "<memory:51200, vCores:5> is less than to its children:"
+ "<memory:102400, vCores:10> for queue:queueA", e.getMessage());
}
}
@Test
public void testEffectiveResourceAfterReducingClusterResource()
throws Exception {
// create conf with basic queue configuration.
CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration(
false);
setupMinMaxResourceConfiguration(csConf);
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
@SuppressWarnings("resource")
MockRM rm = new MockRM(csConf);
rm.start();
// Add few nodes
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 125 * GB, 20);
rm.registerNode("127.0.0.2:1234", 125 * GB, 20);
// Get queue object to verify min/max resource configuration.
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue qA = (LeafQueue) cs.getQueue(QUEUEA);
Assert.assertNotNull(qA);
Assert.assertEquals("Min resource configured for QUEUEA is not correct",
QUEUE_A_MINRES, qA.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals("Max resource configured for QUEUEA is not correct",
QUEUE_A_MAXRES, qA.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals("Effective Min resource for QUEUEA is not correct",
QUEUE_A_MINRES, qA.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEA is not correct",
QUEUE_A_MAXRES, qA.queueResourceQuotas.getEffectiveMaxResource());
LeafQueue qB = (LeafQueue) cs.getQueue(QUEUEB);
Assert.assertNotNull(qB);
Assert.assertEquals("Min resource configured for QUEUEB is not correct",
QUEUE_B_MINRES, qB.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals("Max resource configured for QUEUEB is not correct",
QUEUE_B_MAXRES, qB.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals("Effective Min resource for QUEUEB is not correct",
QUEUE_B_MINRES, qB.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEB is not correct",
QUEUE_B_MAXRES, qB.queueResourceQuotas.getEffectiveMaxResource());
LeafQueue qC = (LeafQueue) cs.getQueue(QUEUEC);
Assert.assertNotNull(qC);
Assert.assertEquals("Min resource configured for QUEUEC is not correct",
QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals("Max resource configured for QUEUEC is not correct",
QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals("Effective Min resource for QUEUEC is not correct",
QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEC is not correct",
QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource());
// unregister one NM.
rm.unRegisterNode(nm1);
// After loosing one NM, effective min res of queueA will become just
// above half. Hence A's min will be 60Gi and 6 cores and max will be
// 128GB and 20 cores.
Assert.assertEquals("Effective Min resource for QUEUEA is not correct",
QUEUEA_REDUCED, qA.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEA is not correct",
QUEUEMAX_REDUCED, qA.queueResourceQuotas.getEffectiveMaxResource());
Assert.assertEquals("Effective Min resource for QUEUEB is not correct",
QUEUEB_REDUCED, qB.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEB is not correct",
QUEUEMAX_REDUCED, qB.queueResourceQuotas.getEffectiveMaxResource());
Assert.assertEquals("Effective Min resource for QUEUEC is not correct",
QUEUEC_REDUCED, qC.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals("Effective Max resource for QUEUEC is not correct",
QUEUEMAX_REDUCED, qC.queueResourceQuotas.getEffectiveMaxResource());
rm.stop();
}
}

View File

@ -60,6 +60,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
@ -86,6 +87,7 @@ public class TestApplicationLimits {
final static int GB = 1024;
LeafQueue queue;
CSQueue root;
private final ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
@ -100,7 +102,7 @@ public void setUp() throws IOException {
setupQueueConfiguration(csConf);
rmContext = TestUtils.getMockRMContext();
Resource clusterResource = Resources.createResource(10 * 16 * GB, 10 * 32);
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConfiguration()).thenReturn(csConf);
@ -110,10 +112,11 @@ public void setUp() throws IOException {
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
thenReturn(clusterResource);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
RMContainerTokenSecretManager containerTokenSecretManager =
new RMContainerTokenSecretManager(conf);
@ -122,13 +125,17 @@ public void setUp() throws IOException {
containerTokenSecretManager);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root = CapacitySchedulerQueueManager
root = CapacitySchedulerQueueManager
.parseQueue(csContext, csConf, null, "root",
queues, queues,
TestUtils.spyHook);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
queue = spy(new LeafQueue(csContext, A, root, null));
QueueResourceQuotas queueResourceQuotas = ((LeafQueue) queues.get(A))
.getQueueResourceQuotas();
doReturn(queueResourceQuotas).when(queue).getQueueResourceQuotas();
// Stub out ACL checks
doReturn(true).
@ -189,6 +196,8 @@ public void testAMResourceLimit() throws Exception {
// when there is only 1 user, and drops to 2G (the userlimit) when there
// is a second user
Resource clusterResource = Resource.newInstance(80 * GB, 40);
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
queue.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
@ -287,6 +296,8 @@ public void testLimitsComputation() throws Exception {
CSQueue root =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
"root", queues, queues, TestUtils.spyHook);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
LeafQueue queue = (LeafQueue)queues.get(A);
@ -357,6 +368,8 @@ public void testLimitsComputation() throws Exception {
csContext, csConf, null, "root",
queues, queues, TestUtils.spyHook);
clusterResource = Resources.createResource(100 * 16 * GB);
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
queue = (LeafQueue)queues.get(A);
@ -378,6 +391,8 @@ public void testLimitsComputation() throws Exception {
root = CapacitySchedulerQueueManager.parseQueue(
csContext, csConf, null, "root",
queues, queues, TestUtils.spyHook);
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
queue = (LeafQueue)queues.get(A);
assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
@ -578,6 +593,7 @@ public void testHeadroom() throws Exception {
thenReturn(Resources.createResource(16*GB));
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
// Say cluster has 100 nodes of 16G each
Resource clusterResource = Resources.createResource(100 * 16 * GB);
@ -586,6 +602,8 @@ public void testHeadroom() throws Exception {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
csConf, null, "root", queues, queues, TestUtils.spyHook);
rootQueue.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())
@ -693,6 +711,8 @@ public void testHeadroom() throws Exception {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
rootQueue.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Any change is cluster resource needs to enforce user-limit recomputation.
// In existing code, LeafQueue#updateClusterResource handled this. However

View File

@ -54,6 +54,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -600,6 +601,7 @@ public void testHeadroom() throws Exception {
RMContext spyRMContext = spy(rmContext);
when(spyRMContext.getNodeLabelManager()).thenReturn(mgr);
when(csContext.getRMContext()).thenReturn(spyRMContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
mgr.activateNode(NodeId.newInstance("h0", 0),
Resource.newInstance(160 * GB, 16)); // default Label
@ -615,6 +617,8 @@ public void testHeadroom() throws Exception {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
csConf, null, "root", queues, queues, TestUtils.spyHook);
rootQueue.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())

View File

@ -4307,7 +4307,7 @@ public void testCSReservationWithRootUnblocked() throws Exception {
null, null, NULL_UPDATE_REQUESTS);
CapacityScheduler.schedule(cs);
}
assertEquals("P2 Used Resource should be 8 GB", 8 * GB,
assertEquals("P2 Used Resource should be 7 GB", 7 * GB,
cs.getQueue("p2").getUsedResources().getMemorySize());
//Free a container from X1

View File

@ -242,6 +242,8 @@ public void testSortedQueues() throws Exception {
Resources.createResource(numNodes * (memoryPerNode*GB),
numNodes * coresPerNode);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Start testing
CSQueue a = queues.get(A);

View File

@ -191,6 +191,8 @@ private void setUpInternal(ResourceCalculator rC) throws Exception {
CapacitySchedulerConfiguration.ROOT,
queues, queues,
TestUtils.spyHook);
root.updateClusterResource(Resources.createResource(100 * 16 * GB, 100 * 32),
new ResourceLimits(Resources.createResource(100 * 16 * GB, 100 * 32)));
ResourceUsage queueResUsage = root.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())
@ -307,13 +309,11 @@ public void testInitializeQueue() throws Exception {
// Verify the value for getAMResourceLimit for queues with < .1 maxcap
Resource clusterResource = Resource.newInstance(50 * GB, 50);
a.updateClusterResource(clusterResource,
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
assertEquals(Resource.newInstance(1 * GB, 1),
a.calculateAndGetAMResourceLimit());
b.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
assertEquals(Resource.newInstance(5 * GB, 1),
b.calculateAndGetAMResourceLimit());
}
@ -358,6 +358,8 @@ public void testSingleQueueOneUserMetrics() throws Exception {
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
@ -556,6 +558,8 @@ public void testSingleQueueWithOneUser() throws Exception {
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
@ -630,6 +634,8 @@ public void testSingleQueueWithOneUser() throws Exception {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
@ -699,6 +705,8 @@ public void testDRFUsageRatioRounding() throws Exception {
Resource clusterResource =
Resources.createResource(numNodes * (80 * GB), numNodes * 100);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Set user-limit. Need a small queue within a large cluster.
b.setUserLimit(50);
@ -779,6 +787,8 @@ public void testDRFUserLimits() throws Exception {
Resources.createResource(numNodes * (8 * GB), numNodes * 100);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getClusterResource()).thenReturn(clusterResource);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests so that one application is memory dominant
// and other application is vcores dominant
@ -891,6 +901,8 @@ public void testUserLimits() throws Exception {
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
@ -915,6 +927,8 @@ public void testUserLimits() throws Exception {
// Set user-limit
a.setUserLimit(50);
a.setUserLimitFactor(2);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// There're two active users
assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
@ -940,7 +954,7 @@ public void testUserLimits() throws Exception {
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
// Allocate one container to app_0, before allocating this container,
// user-limit = ceil((4 + 1) / 2) = 3G. app_0's used resource (3G) <=
// user-limit = floor((5 + 1) / 2) = 3G. app_0's used resource (3G) <=
// user-limit.
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node_1,
@ -1068,15 +1082,9 @@ public void testUserSpecificUserLimits() throws Exception {
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(9*GB, a.getUsedResources().getMemorySize());
assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(4*GB,
app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize());
assertEquals(1*GB,
app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize());
assertEquals(12*GB, a.getUsedResources().getMemorySize());
assertEquals(12*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@ -1100,6 +1108,8 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
CapacitySchedulerQueueManager mockCapacitySchedulerQueueManager
= mock(CapacitySchedulerQueueManager.class);
@ -1122,6 +1132,8 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
qb.setUserLimit(100);
qb.setUserLimitFactor(1);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
@ -1255,106 +1267,6 @@ public void testComputeUserLimitAndSetHeadroom() throws IOException {
assertEquals(1*GB, app_4.getHeadroom().getMemorySize());
}
@Test
public void testUserHeadroomMultiApp() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes
String host_0 = "127.0.0.1";
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK,
0, 16*GB);
String host_1 = "127.0.0.2";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK,
0, 16*GB);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
app_1, app_2.getApplicationAttemptId(), app_2);
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
node_0, node_1.getNodeID(), node_1);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(1*GB, a.getUsedResources().getMemorySize());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
//Now, headroom is the same for all apps for a given user + queue combo
//and a change to any app's headroom is reflected for all the user's apps
//once those apps are active/have themselves calculated headroom for
//allocation at least one time
assertEquals(2*GB, app_0.getHeadroom().getMemorySize());
assertEquals(0*GB, app_1.getHeadroom().getMemorySize());//not yet active
assertEquals(0*GB, app_2.getHeadroom().getMemorySize());//not yet active
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(2*GB, a.getUsedResources().getMemorySize());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_0.getHeadroom().getMemorySize());
assertEquals(1*GB, app_1.getHeadroom().getMemorySize());//now active
assertEquals(0*GB, app_2.getHeadroom().getMemorySize());//not yet active
//Complete container and verify that headroom is updated, for both apps
//for the user
RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
a.completedContainer(clusterResource, app_0, node_0, rmContainer,
ContainerStatus.newInstance(rmContainer.getContainerId(),
ContainerState.COMPLETE, "",
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
assertEquals(2*GB, app_0.getHeadroom().getMemorySize());
assertEquals(2*GB, app_1.getHeadroom().getMemorySize());
}
@Test
public void testHeadroomWithMaxCap() throws Exception {
// Mock the queue
@ -1404,6 +1316,11 @@ public void testHeadroomWithMaxCap() throws Exception {
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
ParentQueue root = (ParentQueue) queues
.get(CapacitySchedulerConfiguration.ROOT);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
@ -1454,6 +1371,8 @@ public void testHeadroomWithMaxCap() throws Exception {
// Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
@ -1542,6 +1461,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getClusterResource()).thenReturn(clusterResource);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
@ -1624,6 +1545,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Test max-capacity
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
@ -1638,6 +1561,8 @@ public void testSingleQueueWithMultipleUsers() throws Exception {
// Now, allocations should goto app_3 since it's under user-limit
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(1);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
@ -1743,6 +1668,8 @@ public void testReservation() throws Exception {
Resource clusterResource =
Resources.createResource(numNodes * (4*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
@ -1880,6 +1807,8 @@ public void testReservationExchange() throws Exception {
final int numNodes = 3;
Resource clusterResource =
Resources.createResource(numNodes * (4*GB), numNodes * 16);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(4*GB, 16));
@ -2051,6 +1980,8 @@ public void testLocalityScheduling() throws Exception {
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests and submit
Priority priority = TestUtils.createMockPriority(1);
@ -2237,11 +2168,10 @@ public void testRackLocalityDelayScheduling() throws Exception {
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResource());
// Manipulate queue 'b'
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B));
LeafQueue a = stubLeafQueue((LeafQueue) newQueues.get(B));
// Check locality parameters.
assertEquals(2, a.getNodeLocalityDelay());
@ -2277,6 +2207,8 @@ public void testRackLocalityDelayScheduling() throws Exception {
Resource clusterResource =
Resources.createResource(numNodes * (8 * GB), numNodes * 16);
when(spyRMContext.getScheduler().getNumClusterNodes()).thenReturn(numNodes);
newRoot.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests and submit
Priority priority = TestUtils.createMockPriority(1);
@ -2412,6 +2344,8 @@ public void testApplicationPriorityScheduling() throws Exception {
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests and submit
List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
@ -2545,6 +2479,8 @@ public void testSchedulingConstraints() throws Exception {
Resource clusterResource = Resources.createResource(
numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests and submit
Priority priority = TestUtils.createMockPriority(1);
@ -2660,17 +2596,14 @@ public void testActivateApplicationAfterQueueRefresh() throws Exception {
assertEquals(2, e.getNumActiveApplications());
assertEquals(1, e.getNumPendingApplications());
csConf.setDouble(CapacitySchedulerConfiguration
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
CapacitySchedulerConfiguration
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2);
csConf.setDouble(
CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT
* 2);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
TestUtils.spyHook);
root.reinitialize(newRoot, csContext.getClusterResource());
// after reinitialization
@ -2697,7 +2630,6 @@ public void testLocalityDelaysAfterQueueRefresh() throws Exception {
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResource());
// after reinitialization
@ -2745,7 +2677,7 @@ public void testActivateApplicationByUpdatingClusterResource()
assertEquals(1, e.getNumPendingApplications());
Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32);
e.updateClusterResource(clusterResource,
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// after updating cluster resource
@ -2837,6 +2769,9 @@ public void testLocalityConstraints() throws Exception {
numNodes * (8*GB), numNodes * 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
// resourceName: <priority, memory, #containers, relaxLocality>
// host_0_0: < 1, 1GB, 1, true >
@ -3036,36 +2971,44 @@ public void testLocalityConstraints() throws Exception {
@Test
public void testMaxAMResourcePerQueuePercentAfterQueueRefresh()
throws Exception {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
Resource clusterResource = Resources
.createResource(100 * 16 * GB, 100 * 32);
final String newRootName = "root" + System.currentTimeMillis();
setupQueueConfiguration(csConf, newRootName);
Resource clusterResource = Resources.createResource(100 * 16 * GB,
100 * 32);
CapacitySchedulerContext csContext = mockCSContext(csConf, clusterResource);
when(csContext.getRMContext()).thenReturn(rmContext);
csConf.setFloat(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.1f);
ParentQueue root = new ParentQueue(csContext,
CapacitySchedulerConfiguration.ROOT, null, null);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 80);
LeafQueue a = new LeafQueue(csContext, A, root, null);
assertEquals(0.1f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
assertEquals(a.calculateAndGetAMResourceLimit(),
Resources.createResource(160 * GB, 1));
csConf.setFloat(
CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
0.1f);
csConf.setFloat(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.2f);
LeafQueue newA = new LeafQueue(csContext, A, root, null);
a.reinitialize(newA, clusterResource);
assertEquals(0.2f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
assertEquals(a.calculateAndGetAMResourceLimit(),
CSQueue root;
root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Manipulate queue 'a'
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
assertEquals(0.1f, b.getMaxAMResourcePerQueuePercent(), 1e-3f);
assertEquals(b.calculateAndGetAMResourceLimit(),
Resources.createResource(159 * GB, 1));
csConf.setFloat(
CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
0.2f);
clusterResource = Resources.createResource(100 * 20 * GB, 100 * 32);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
TestUtils.spyHook);
root.reinitialize(newRoot, clusterResource);
b = stubLeafQueue((LeafQueue) newQueues.get(B));
assertEquals(b.calculateAndGetAMResourceLimit(),
Resources.createResource(320 * GB, 1));
Resource newClusterResource = Resources.createResource(100 * 20 * GB,
100 * 32);
a.updateClusterResource(newClusterResource,
new ResourceLimits(newClusterResource));
// 100 * 20 * 0.2 = 400
assertEquals(a.calculateAndGetAMResourceLimit(),
Resources.createResource(400 * GB, 1));
}
@Test
@ -3142,6 +3085,8 @@ public void testFifoAssignment() throws Exception {
Resource clusterResource = Resources.createResource(numNodes * (16 * GB),
numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
String user_0 = "user_0";
@ -3308,6 +3253,8 @@ public void testFairAssignment() throws Exception {
Resource clusterResource = Resources.createResource(
numNodes * (16*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
String user_0 = "user_0";
@ -3435,6 +3382,8 @@ public void testLocalityDelaySkipsApplication() throws Exception {
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests and submit
// App0 has node local request for host_0/host_1, and app1 has node local
@ -3533,6 +3482,8 @@ public void testGetTotalPendingResourcesConsideringUserLimitOneUser()
Resource clusterResource =
Resources.createResource(numNodes * (100*GB), numNodes * 128);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Pending resource requests for app_0 and app_1 total 5GB.
Priority priority = TestUtils.createMockPriority(1);
@ -3699,6 +3650,8 @@ public void testGetTotalPendingResourcesConsideringUserLimitTwoUsers()
Resource clusterResource =
Resources.createResource(numNodes * (100*GB), numNodes * 128);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Pending resource requests for user_0: app_0 and app_1 total 3GB.
Priority priority = TestUtils.createMockPriority(1);

View File

@ -249,6 +249,8 @@ public void testSingleLevelQueues() throws Exception {
Resources.createResource(numNodes * (memoryPerNode*GB),
numNodes * coresPerNode);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Start testing
LeafQueue a = (LeafQueue)queues.get(A);
@ -494,6 +496,8 @@ public void testMultiLevelQueues() throws Exception {
Resources.createResource(numNodes * (memoryPerNode*GB),
numNodes * coresPerNode);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Start testing
CSQueue a = queues.get(A);
@ -710,6 +714,8 @@ public void testOffSwitchScheduling() throws Exception {
Resources.createResource(numNodes * (memoryPerNode*GB),
numNodes * coresPerNode);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Start testing
LeafQueue a = (LeafQueue)queues.get(A);
@ -790,6 +796,8 @@ public void testOffSwitchSchedulingMultiLevelQueues() throws Exception {
Resources.createResource(numNodes * (memoryPerNode*GB),
numNodes * coresPerNode);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Start testing
LeafQueue b3 = (LeafQueue)queues.get(B3);

View File

@ -267,6 +267,8 @@ public void testReservation() throws Exception {
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priorityAM = TestUtils.createMockPriority(1);
@ -454,6 +456,8 @@ public void testReservationLimitOtherUsers() throws Exception {
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priorityAM = TestUtils.createMockPriority(1);
@ -600,6 +604,8 @@ public void testReservationNoContinueLook() throws Exception {
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priorityAM = TestUtils.createMockPriority(1);
@ -782,6 +788,8 @@ public void testAssignContainersNeedToUnreserve() throws Exception {
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priorityAM = TestUtils.createMockPriority(1);
@ -897,6 +905,10 @@ public void testGetAppToUnreserve() throws Exception {
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
8 * GB);
Resource clusterResource = Resources.createResource(2 * 8 * GB);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority p = TestUtils.createMockPriority(5);
@ -1068,6 +1080,8 @@ public void testAssignToQueue() throws Exception {
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priorityAM = TestUtils.createMockPriority(1);
@ -1256,6 +1270,8 @@ public void testAssignToUser() throws Exception {
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priorityAM = TestUtils.createMockPriority(1);
@ -1418,6 +1434,9 @@ public void testReservationsNoneAvailable() throws Exception {
final int numNodes = 3;
Resource clusterResource = Resources.createResource(numNodes * (8 * GB));
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priorityAM = TestUtils.createMockPriority(1);

View File

@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableTable;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.junit.Assert;
@ -52,6 +53,8 @@ private List<CSQueue> mockCSQueues(String[] queueNames, int[] priorities,
when(q.getQueueCapacities()).thenReturn(qc);
when(q.getPriority()).thenReturn(Priority.newInstance(priorities[i]));
QueueResourceQuotas qr = new QueueResourceQuotas();
when(q.getQueueResourceQuotas()).thenReturn(qr);
list.add(q);
}

View File

@ -354,10 +354,10 @@ private void verifyClusterSchedulerGeneric(String type, float usedCapacity,
private void verifySubQueue(JSONObject info, String q,
float parentAbsCapacity, float parentAbsMaxCapacity)
throws JSONException, Exception {
int numExpectedElements = 18;
int numExpectedElements = 20;
boolean isParentQueue = true;
if (!info.has("queues")) {
numExpectedElements = 31;
numExpectedElements = 33;
isParentQueue = false;
}
assertEquals("incorrect number of elements", numExpectedElements, info.length());