YARN-3124. Fixed CS LeafQueue/ParentQueue to use QueueCapacities to track capacities-by-label. Contributed by Wangda Tan

This commit is contained in:
Jian He 2015-02-12 14:58:09 -08:00
parent 11d8934463
commit 18a594257e
17 changed files with 416 additions and 551 deletions

View File

@ -281,6 +281,9 @@ Release 2.7.0 - UNRELEASED
YARN-3181. FairScheduler: Fix up outdated findbugs issues. (kasha)
YARN-3124. Fixed CS LeafQueue/ParentQueue to use QueueCapacities to track
capacities-by-label. (Wangda Tan via jianhe)
OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and

View File

@ -194,6 +194,8 @@
<Field name="absoluteNodeLabelCapacities" />
<Field name="reservationsContinueLooking" />
<Field name="absoluteCapacityByNodeLabels" />
<Field name="authorizer" />
<Field name="parent" />
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>

View File

@ -44,23 +44,15 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue {
CSQueue parent;
final String queueName;
float capacity;
float maximumCapacity;
float absoluteCapacity;
float absoluteMaxCapacity;
float absoluteUsedCapacity = 0.0f;
float usedCapacity = 0.0f;
volatile int numContainers;
final Resource minimumAllocation;
Resource minimumAllocation;
Resource maximumAllocation;
QueueState state;
final QueueMetrics metrics;
@ -70,10 +62,6 @@ public abstract class AbstractCSQueue implements CSQueue {
Set<String> accessibleLabels;
RMNodeLabelsManager labelManager;
String defaultLabelExpression;
Map<String, Float> absoluteCapacityByNodeLabels;
Map<String, Float> capacitiyByNodeLabels;
Map<String, Float> absoluteMaxCapacityByNodeLabels;
Map<String, Float> maxCapacityByNodeLabels;
Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>();
@ -83,15 +71,17 @@ public abstract class AbstractCSQueue implements CSQueue {
// Track resource usage-by-label like used-resource/pending-resource, etc.
ResourceUsage queueUsage;
// Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity,
// etc.
QueueCapacities queueCapacities;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private CapacitySchedulerContext csContext;
protected CapacitySchedulerContext csContext;
protected YarnAuthorizationProvider authorizer = null;
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
this.minimumAllocation = cs.getMinimumResourceCapability();
this.maximumAllocation = cs.getMaximumResourceCapability();
this.labelManager = cs.getRMContext().getNodeLabelManager();
this.parent = parent;
this.queueName = queueName;
@ -102,68 +92,55 @@ public AbstractCSQueue(CapacitySchedulerContext cs,
QueueMetrics.forQueue(getQueuePath(), parent,
cs.getConfiguration().getEnableUserMetrics(),
cs.getConf());
// get labels
this.accessibleLabels = cs.getConfiguration().getAccessibleNodeLabels(getQueuePath());
this.defaultLabelExpression = cs.getConfiguration()
.getDefaultNodeLabelExpression(getQueuePath());
// inherit from parent if labels not set
if (this.accessibleLabels == null && parent != null) {
this.accessibleLabels = parent.getAccessibleNodeLabels();
}
SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
this.accessibleLabels);
// inherit from parent if labels not set
if (this.defaultLabelExpression == null && parent != null
&& this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
}
// set capacity by labels
capacitiyByNodeLabels =
cs.getConfiguration().getNodeLabelCapacities(getQueuePath(), accessibleLabels,
labelManager);
// set maximum capacity by labels
maxCapacityByNodeLabels =
cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(),
accessibleLabels, labelManager);
this.csContext = cs;
// initialize ResourceUsage
queueUsage = new ResourceUsage();
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
authorizer = YarnAuthorizationProvider.getInstance(cs.getConf());
// initialize QueueCapacities
queueCapacities = new QueueCapacities(parent == null);
}
protected void setupConfigurableCapacities() {
CSQueueUtils.loadUpdateAndCheckCapacities(
getQueuePath(),
accessibleLabels,
csContext.getConfiguration(),
queueCapacities,
parent == null ? null : parent.getQueueCapacities(),
csContext.getRMContext().getNodeLabelManager());
}
@Override
public synchronized float getCapacity() {
return capacity;
return queueCapacities.getCapacity();
}
@Override
public synchronized float getAbsoluteCapacity() {
return absoluteCapacity;
return queueCapacities.getAbsoluteCapacity();
}
@Override
public float getAbsoluteMaximumCapacity() {
return absoluteMaxCapacity;
return queueCapacities.getAbsoluteMaximumCapacity();
}
@Override
public synchronized float getAbsoluteUsedCapacity() {
return absoluteUsedCapacity;
return queueCapacities.getAbsoluteUsedCapacity();
}
@Override
public float getMaximumCapacity() {
return maximumCapacity;
return queueCapacities.getMaximumCapacity();
}
@Override
public synchronized float getUsedCapacity() {
return usedCapacity;
return queueCapacities.getUsedCapacity();
}
@Override
@ -216,12 +193,12 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
@Override
public synchronized void setUsedCapacity(float usedCapacity) {
this.usedCapacity = usedCapacity;
queueCapacities.setUsedCapacity(usedCapacity);
}
@Override
public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
this.absoluteUsedCapacity = absUsedCapacity;
queueCapacities.setAbsoluteUsedCapacity(absUsedCapacity);
}
/**
@ -230,21 +207,16 @@ public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
*/
synchronized void setMaxCapacity(float maximumCapacity) {
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
CSQueueUtils.checkMaxCapacity(getQueueName(),
queueCapacities.getCapacity(), maximumCapacity);
float absMaxCapacity =
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity,
CSQueueUtils.checkAbsoluteCapacity(getQueueName(),
queueCapacities.getAbsoluteCapacity(),
absMaxCapacity);
this.maximumCapacity = maximumCapacity;
this.absoluteMaxCapacity = absMaxCapacity;
}
@Override
public float getAbsActualCapacity() {
// for now, simply return actual capacity = guaranteed capacity for parent
// queue
return absoluteCapacity;
queueCapacities.setMaximumCapacity(maximumCapacity);
queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity);
}
@Override
@ -252,39 +224,39 @@ public String getDefaultNodeLabelExpression() {
return defaultLabelExpression;
}
synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map<AccessType, AccessControlList> acls,
Set<String> labels, String defaultLabelExpression,
Map<String, Float> nodeLabelCapacities,
Map<String, Float> maximumNodeLabelCapacities,
boolean reservationContinueLooking, Resource maxAllocation)
synchronized void setupQueueConfigs(Resource clusterResource)
throws IOException {
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity,
absoluteMaxCapacity);
// get labels
this.accessibleLabels =
csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath());
this.defaultLabelExpression = csContext.getConfiguration()
.getDefaultNodeLabelExpression(getQueuePath());
this.capacity = capacity;
this.absoluteCapacity = absoluteCapacity;
this.maximumCapacity = maximumCapacity;
this.absoluteMaxCapacity = absoluteMaxCapacity;
this.state = state;
this.acls = acls;
// inherit from parent if labels not set
if (this.accessibleLabels == null && parent != null) {
this.accessibleLabels = parent.getAccessibleNodeLabels();
}
SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
this.accessibleLabels);
// set labels
this.accessibleLabels = labels;
// inherit from parent if labels not set
if (this.defaultLabelExpression == null && parent != null
&& this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
}
// After we setup labels, we can setup capacities
setupConfigurableCapacities();
// set label expression
this.defaultLabelExpression = defaultLabelExpression;
this.minimumAllocation = csContext.getMinimumResourceCapability();
this.maximumAllocation =
csContext.getConfiguration().getMaximumAllocationPerQueue(
getQueuePath());
// copy node label capacity
this.capacitiyByNodeLabels = new HashMap<String, Float>(nodeLabelCapacities);
this.maxCapacityByNodeLabels =
new HashMap<String, Float>(maximumNodeLabelCapacities);
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
this.state = csContext.getConfiguration().getState(getQueuePath());
this.acls = csContext.getConfiguration().getAcls(getQueuePath());
// Update metrics
CSQueueUtils.updateQueueStatistics(
@ -311,34 +283,19 @@ synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
}
}
}
// calculate absolute capacity by each node label
this.absoluteCapacityByNodeLabels =
CSQueueUtils.computeAbsoluteCapacityByNodeLabels(
this.capacitiyByNodeLabels, parent);
// calculate maximum capacity by each node label
this.absoluteMaxCapacityByNodeLabels =
CSQueueUtils.computeAbsoluteMaxCapacityByNodeLabels(
maximumNodeLabelCapacities, parent);
// check absoluteMaximumNodeLabelCapacities is valid
CSQueueUtils.checkAbsoluteCapacitiesByLabel(getQueueName(),
absoluteCapacityByNodeLabels, absoluteCapacityByNodeLabels);
this.reservationsContinueLooking = reservationContinueLooking;
this.reservationsContinueLooking = csContext.getConfiguration()
.getReservationContinueLook();
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
this.maximumAllocation = maxAllocation;
}
protected QueueInfo getQueueInfo() {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(queueName);
queueInfo.setAccessibleNodeLabels(accessibleLabels);
queueInfo.setCapacity(capacity);
queueInfo.setMaximumCapacity(maximumCapacity);
queueInfo.setCapacity(queueCapacities.getCapacity());
queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
queueInfo.setQueueState(state);
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
queueInfo.setCurrentCapacity(getUsedCapacity());
@ -388,51 +345,6 @@ protected synchronized void releaseResource(Resource clusterResource,
--numContainers;
}
@Private
public float getCapacityByNodeLabel(String label) {
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
if (null == parent) {
return 1f;
}
return getCapacity();
}
if (!capacitiyByNodeLabels.containsKey(label)) {
return 0f;
} else {
return capacitiyByNodeLabels.get(label);
}
}
@Private
public float getAbsoluteCapacityByNodeLabel(String label) {
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
if (null == parent) {
return 1f;
}
return getAbsoluteCapacity();
}
if (!absoluteCapacityByNodeLabels.containsKey(label)) {
return 0f;
} else {
return absoluteCapacityByNodeLabels.get(label);
}
}
@Private
public float getAbsoluteMaximumCapacityByNodeLabel(String label) {
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
return getAbsoluteMaximumCapacity();
}
if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
return 0f;
} else {
return absoluteMaxCapacityByNodeLabels.get(label);
}
}
@Private
public boolean getReservationContinueLooking() {
return reservationsContinueLooking;
@ -442,21 +354,21 @@ public boolean getReservationContinueLooking() {
public Map<AccessType, AccessControlList> getACLs() {
return acls;
}
@Private
public Resource getUsedResourceByLabel(String nodeLabel) {
return queueUsage.getUsed(nodeLabel);
}
@VisibleForTesting
public ResourceUsage getResourceUsage() {
return queueUsage;
}
@Private
public boolean getPreemptionDisabled() {
return preemptionDisabled;
}
@Private
public QueueCapacities getQueueCapacities() {
return queueCapacities;
}
@Private
public ResourceUsage getQueueResourceUsage() {
return queueUsage;
}
/**
* The specified queue is preemptable if system-wide preemption is turned on

View File

@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@ -75,15 +76,6 @@ public interface CSQueue
* @return configured queue capacity
*/
public float getCapacity();
/**
* Get actual <em>capacity</em> of the queue, this may be different from
* configured capacity when mis-config take place, like add labels to the
* cluster
*
* @return actual queue capacity
*/
public float getAbsActualCapacity();
/**
* Get capacity of the parent of the queue as a function of the
@ -143,14 +135,6 @@ public interface CSQueue
*/
public Resource getUsedResources();
/**
* Get the currently utilized resources which allocated at nodes with label
* specified
*
* @return used resources by the queue and it's children
*/
public Resource getUsedResourceByLabel(String nodeLabel);
/**
* Get the current run-state of the queue
* @return current run-state
@ -279,31 +263,22 @@ public void detachContainer(Resource clusterResource,
*/
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer container);
/**
* Get absolute capacity by label of this queue can use
* @param nodeLabel
* @return absolute capacity by label of this queue can use
*/
public float getAbsoluteCapacityByNodeLabel(String nodeLabel);
/**
* Get absolute max capacity by label of this queue can use
* @param nodeLabel
* @return absolute capacity by label of this queue can use
*/
public float getAbsoluteMaximumCapacityByNodeLabel(String nodeLabel);
/**
* Get capacity by node label
* @param nodeLabel
* @return capacity by node label
*/
public float getCapacityByNodeLabel(String nodeLabel);
/**
* Check whether <em>disable_preemption</em> property is set for this queue
* @return true if <em>disable_preemption</em> is set, false if not
*/
public boolean getPreemptionDisabled();
/**
* Get QueueCapacities of this queue
* @return queueCapacities
*/
public QueueCapacities getQueueCapacities();
/**
* Get ResourceUsage of this queue
* @return resourceUsage
*/
public ResourceUsage getQueueResourceUsage();
}

View File

@ -17,13 +17,14 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -34,6 +35,9 @@ class CSQueueUtils {
final static float EPSILON = 0.0001f;
/*
* Used only by tests
*/
public static void checkMaxCapacity(String queueName,
float capacity, float maximumCapacity) {
if (maximumCapacity < 0.0f || maximumCapacity > 1.0f) {
@ -43,6 +47,9 @@ public static void checkMaxCapacity(String queueName,
}
}
/*
* Used only by tests
*/
public static void checkAbsoluteCapacity(String queueName,
float absCapacity, float absMaxCapacity) {
if (absMaxCapacity < (absCapacity - EPSILON)) {
@ -53,19 +60,33 @@ public static void checkAbsoluteCapacity(String queueName,
}
}
public static void checkAbsoluteCapacitiesByLabel(String queueName,
Map<String, Float> absCapacities,
Map<String, Float> absMaximumCapacities) {
for (Entry<String, Float> entry : absCapacities.entrySet()) {
String label = entry.getKey();
float absCapacity = entry.getValue();
float absMaxCapacity = absMaximumCapacities.get(label);
if (absMaxCapacity < (absCapacity - EPSILON)) {
throw new IllegalArgumentException("Illegal call to setMaxCapacity. "
+ "Queue '" + queueName + "' has " + "an absolute capacity ("
+ absCapacity + ") greater than "
+ "its absolute maximumCapacity (" + absMaxCapacity + ") of label="
+ label);
/**
* Check sanity of capacities:
* - capacity <= maxCapacity
* - absCapacity <= absMaximumCapacity
*/
private static void capacitiesSanityCheck(String queueName,
QueueCapacities queueCapacities) {
for (String label : queueCapacities.getExistingNodeLabels()) {
float capacity = queueCapacities.getCapacity(label);
float maximumCapacity = queueCapacities.getMaximumCapacity(label);
if (capacity > maximumCapacity) {
throw new IllegalArgumentException("Illegal queue capacity setting, "
+ "(capacity=" + capacity + ") > (maximum-capacity="
+ maximumCapacity + "). When label=[" + label + "]");
}
// Actually, this may not needed since we have verified capacity <=
// maximumCapacity. And the way we compute absolute capacity (abs(x) =
// cap(x) * cap(x.parent) * ...) is a monotone increasing function. But
// just keep it here to make sure our compute abs capacity method works
// correctly.
float absCapacity = queueCapacities.getAbsoluteCapacity(label);
float absMaxCapacity = queueCapacities.getAbsoluteMaximumCapacity(label);
if (absCapacity > absMaxCapacity) {
throw new IllegalArgumentException("Illegal queue capacity setting, "
+ "(abs-capacity=" + absCapacity + ") > (abs-maximum-capacity="
+ absMaxCapacity + "). When label=[" + label + "]");
}
}
}
@ -77,37 +98,94 @@ public static float computeAbsoluteMaximumCapacity(
return (parentAbsMaxCapacity * maximumCapacity);
}
public static Map<String, Float> computeAbsoluteCapacityByNodeLabels(
Map<String, Float> nodeLabelToCapacities, CSQueue parent) {
if (parent == null) {
return nodeLabelToCapacities;
}
Map<String, Float> absoluteCapacityByNodeLabels =
new HashMap<String, Float>();
for (Entry<String, Float> entry : nodeLabelToCapacities.entrySet()) {
String label = entry.getKey();
float capacity = entry.getValue();
absoluteCapacityByNodeLabels.put(label,
capacity * parent.getAbsoluteCapacityByNodeLabel(label));
}
return absoluteCapacityByNodeLabels;
/**
* This method intends to be used by ReservationQueue, ReservationQueue will
* not appear in configuration file, so we shouldn't do load capacities
* settings in configuration for reservation queue.
*/
public static void updateAndCheckCapacitiesByLabel(String queuePath,
QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities);
capacitiesSanityCheck(queuePath, queueCapacities);
}
/**
* Do following steps for capacities
* - Load capacities from configuration
* - Update absolute capacities for new capacities
* - Check if capacities/absolute-capacities legal
*/
public static void loadUpdateAndCheckCapacities(String queuePath,
Set<String> accessibleLabels, CapacitySchedulerConfiguration csConf,
QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities,
RMNodeLabelsManager nlm) {
loadCapacitiesByLabelsFromConf(queuePath, accessibleLabels, nlm,
queueCapacities, csConf);
updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities);
capacitiesSanityCheck(queuePath, queueCapacities);
}
public static Map<String, Float> computeAbsoluteMaxCapacityByNodeLabels(
Map<String, Float> maximumNodeLabelToCapacities, CSQueue parent) {
if (parent == null) {
return maximumNodeLabelToCapacities;
// Considered NO_LABEL, ANY and null cases
private static Set<String> normalizeAccessibleNodeLabels(Set<String> labels,
RMNodeLabelsManager mgr) {
Set<String> accessibleLabels = new HashSet<String>();
if (labels != null) {
accessibleLabels.addAll(labels);
}
Map<String, Float> absoluteMaxCapacityByNodeLabels =
new HashMap<String, Float>();
for (Entry<String, Float> entry : maximumNodeLabelToCapacities.entrySet()) {
String label = entry.getKey();
float maxCapacity = entry.getValue();
absoluteMaxCapacityByNodeLabels.put(label,
maxCapacity * parent.getAbsoluteMaximumCapacityByNodeLabel(label));
if (accessibleLabels.contains(CommonNodeLabelsManager.ANY)) {
accessibleLabels.addAll(mgr.getClusterNodeLabels());
}
accessibleLabels.add(CommonNodeLabelsManager.NO_LABEL);
return accessibleLabels;
}
private static void loadCapacitiesByLabelsFromConf(String queuePath,
Set<String> labels, RMNodeLabelsManager mgr,
QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) {
queueCapacities.clearConfigurableFields();
labels = normalizeAccessibleNodeLabels(labels, mgr);
for (String label : labels) {
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
queueCapacities.setCapacity(CommonNodeLabelsManager.NO_LABEL,
csConf.getNonLabeledQueueCapacity(queuePath) / 100);
queueCapacities.setMaximumCapacity(CommonNodeLabelsManager.NO_LABEL,
csConf.getNonLabeledQueueMaximumCapacity(queuePath) / 100);
} else {
queueCapacities.setCapacity(label,
csConf.getLabeledQueueCapacity(queuePath, label) / 100);
queueCapacities.setMaximumCapacity(label,
csConf.getLabeledQueueMaximumCapacity(queuePath, label) / 100);
}
}
}
// Set absolute capacities for {capacity, maximum-capacity}
private static void updateAbsoluteCapacitiesByNodeLabels(
QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
for (String label : queueCapacities.getExistingNodeLabels()) {
float capacity = queueCapacities.getCapacity(label);
if (capacity > 0f) {
queueCapacities.setAbsoluteCapacity(
label,
capacity
* (parentQueueCapacities == null ? 1 : parentQueueCapacities
.getAbsoluteCapacity(label)));
}
float maxCapacity = queueCapacities.getMaximumCapacity(label);
if (maxCapacity > 0f) {
queueCapacities.setAbsoluteMaximumCapacity(
label,
maxCapacity
* (parentQueueCapacities == null ? 1 : parentQueueCapacities
.getAbsoluteMaximumCapacity(label)));
}
}
return absoluteMaxCapacityByNodeLabels;
}
@Lock(CSQueue.class)

View File

@ -278,6 +278,9 @@ static String getQueuePrefix(String queue) {
}
private String getNodeLabelPrefix(String queue, String label) {
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
return getQueuePrefix(queue);
}
return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT;
}
@ -316,7 +319,7 @@ public float getMaximumApplicationMasterResourcePerQueuePercent(String queue) {
getMaximumApplicationMasterResourcePercent());
}
public float getCapacity(String queue) {
public float getNonLabeledQueueCapacity(String queue) {
float capacity = queue.equals("root") ? 100.0f : getFloat(
getQueuePrefix(queue) + CAPACITY, UNDEFINED);
if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
@ -338,7 +341,7 @@ public void setCapacity(String queue, float capacity) {
", capacity=" + capacity);
}
public float getMaximumCapacity(String queue) {
public float getNonLabeledQueueMaximumCapacity(String queue) {
float maxCapacity = getFloat(getQueuePrefix(queue) + MAXIMUM_CAPACITY,
MAXIMUM_CAPACITY_VALUE);
maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ?
@ -442,57 +445,29 @@ public Set<String> getAccessibleNodeLabels(String queue) {
return Collections.unmodifiableSet(set);
}
public Map<String, Float> getNodeLabelCapacities(String queue,
Set<String> labels, RMNodeLabelsManager mgr) {
Map<String, Float> nodeLabelCapacities = new HashMap<String, Float>();
if (labels == null) {
return nodeLabelCapacities;
private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
float defaultValue) {
String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
float capacity = getFloat(capacityPropertyName, defaultValue);
if (capacity < MINIMUM_CAPACITY_VALUE
|| capacity > MAXIMUM_CAPACITY_VALUE) {
throw new IllegalArgumentException("Illegal capacity of " + capacity
+ " for node-label=" + label + " in queue=" + queue
+ ", valid capacity should in range of [0, 100].");
}
for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
.getClusterNodeLabels() : labels) {
String capacityPropertyName = getNodeLabelPrefix(queue, label) + CAPACITY;
float capacity = getFloat(capacityPropertyName, 0f);
if (capacity < MINIMUM_CAPACITY_VALUE
|| capacity > MAXIMUM_CAPACITY_VALUE) {
throw new IllegalArgumentException("Illegal capacity of " + capacity
+ " for node-label=" + label + " in queue=" + queue
+ ", valid capacity should in range of [0, 100].");
}
if (LOG.isDebugEnabled()) {
LOG.debug("CSConf - getCapacityOfLabel: prefix="
+ getNodeLabelPrefix(queue, label) + ", capacity=" + capacity);
}
nodeLabelCapacities.put(label, capacity / 100f);
if (LOG.isDebugEnabled()) {
LOG.debug("CSConf - getCapacityOfLabel: prefix="
+ getNodeLabelPrefix(queue, label) + ", capacity=" + capacity);
}
return nodeLabelCapacities;
return capacity;
}
public Map<String, Float> getMaximumNodeLabelCapacities(String queue,
Set<String> labels, RMNodeLabelsManager mgr) {
Map<String, Float> maximumNodeLabelCapacities = new HashMap<String, Float>();
if (labels == null) {
return maximumNodeLabelCapacities;
}
for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
.getClusterNodeLabels() : labels) {
float maxCapacity =
getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
100f);
if (maxCapacity < MINIMUM_CAPACITY_VALUE
|| maxCapacity > MAXIMUM_CAPACITY_VALUE) {
throw new IllegalArgumentException("Illegal " + "capacity of "
+ maxCapacity + " for label=" + label + " in queue=" + queue);
}
LOG.debug("CSConf - getCapacityOfLabel: prefix="
+ getNodeLabelPrefix(queue, label) + ", capacity=" + maxCapacity);
maximumNodeLabelCapacities.put(label, maxCapacity / 100f);
}
return maximumNodeLabelCapacities;
public float getLabeledQueueCapacity(String queue, String label) {
return internalGetLabeledQueueCapacity(queue, label, CAPACITY, 0f);
}
public float getLabeledQueueMaximumCapacity(String queue, String label) {
return internalGetLabeledQueueCapacity(queue, label, MAXIMUM_CAPACITY, 100f);
}
public String getDefaultNodeLabelExpression(String queue) {

View File

@ -122,49 +122,7 @@ public LeafQueue(CapacitySchedulerContext cs,
super(cs, queueName, parent, old);
this.scheduler = cs;
this.activeUsersManager = new ActiveUsersManager(metrics);
this.minimumAllocationFactor =
Resources.ratio(resourceCalculator,
Resources.subtract(maximumAllocation, minimumAllocation),
maximumAllocation);
float capacity = getCapacityFromConf();
float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
float maximumCapacity =
(float)cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
float absoluteMaxCapacity =
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
int userLimit = cs.getConfiguration().getUserLimit(getQueuePath());
float userLimitFactor =
cs.getConfiguration().getUserLimitFactor(getQueuePath());
int maxApplications =
cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
maxApplications = (int)(maxSystemApps * absoluteCapacity);
}
maxApplicationsPerUser =
(int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
float maxAMResourcePerQueuePercent = cs.getConfiguration()
.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
QueueState state = cs.getConfiguration().getState(getQueuePath());
Map<AccessType, AccessControlList> acls =
cs.getConfiguration().getAcls(getQueuePath());
setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
defaultLabelExpression, this.capacitiyByNodeLabels,
this.maxCapacityByNodeLabels,
cs.getConfiguration().getReservationContinueLook(),
cs.getConfiguration().getMaximumAllocationPerQueue(getQueuePath()));
this.activeUsersManager = new ActiveUsersManager(metrics);
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
@ -176,35 +134,13 @@ public LeafQueue(CapacitySchedulerContext cs,
this.pendingApplications =
new TreeSet<FiCaSchedulerApp>(applicationComparator);
this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
}
// externalizing in method, to allow overriding
protected float getCapacityFromConf() {
return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100;
setupQueueConfigs(cs.getClusterResource());
}
protected synchronized void setupQueueConfigs(
Resource clusterResource,
float capacity, float absoluteCapacity,
float maximumCapacity, float absoluteMaxCapacity,
int userLimit, float userLimitFactor,
int maxApplications, float maxAMResourcePerQueuePercent,
int maxApplicationsPerUser, QueueState state,
Map<AccessType, AccessControlList> acls, int nodeLocalityDelay,
Set<String> labels, String defaultLabelExpression,
Map<String, Float> capacitieByLabel,
Map<String, Float> maximumCapacitiesByLabel,
boolean revervationContinueLooking,
Resource maxAllocation) throws IOException {
super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, state, acls, labels,
defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel,
revervationContinueLooking, maxAllocation);
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
float absCapacity = getParent().getAbsoluteCapacity() * capacity;
CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity,
absoluteMaxCapacity);
protected synchronized void setupQueueConfigs(Resource clusterResource)
throws IOException {
super.setupQueueConfigs(clusterResource);
this.lastClusterResource = clusterResource;
updateAbsoluteCapacityResource(clusterResource);
@ -214,16 +150,24 @@ protected synchronized void setupQueueConfigs(
// and all queues may not be realized yet, we'll use (optimistic)
// absoluteMaxCapacity (it will be replaced with the more accurate
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
updateHeadroomInfo(clusterResource,
queueCapacities.getAbsoluteMaximumCapacity());
this.absoluteCapacity = absCapacity;
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
userLimit = conf.getUserLimit(getQueuePath());
userLimitFactor = conf.getUserLimitFactor(getQueuePath());
this.userLimit = userLimit;
this.userLimitFactor = userLimitFactor;
this.maxApplications = maxApplications;
this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
this.maxApplicationsPerUser = maxApplicationsPerUser;
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxSystemApps = conf.getMaximumSystemApplications();
maxApplications =
(int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
}
maxApplicationsPerUser =
(int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
maxAMResourcePerQueuePercent =
conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
this.defaultLabelExpression)) {
@ -239,7 +183,7 @@ protected synchronized void setupQueueConfigs(
getAccessibleNodeLabels().iterator(), ',')));
}
this.nodeLocalityDelay = nodeLocalityDelay;
nodeLocalityDelay = conf.getNodeLocalityDelay();
// re-init this since max allocation could have changed
this.minimumAllocationFactor =
@ -253,21 +197,21 @@ protected synchronized void setupQueueConfigs(
}
StringBuilder labelStrBuilder = new StringBuilder();
if (labels != null) {
for (String s : labels) {
if (accessibleLabels != null) {
for (String s : accessibleLabels) {
labelStrBuilder.append(s);
labelStrBuilder.append(",");
}
}
LOG.info("Initializing " + queueName + "\n" +
"capacity = " + capacity +
"capacity = " + queueCapacities.getCapacity() +
" [= (float) configuredCapacity / 100 ]" + "\n" +
"asboluteCapacity = " + absoluteCapacity +
"asboluteCapacity = " + queueCapacities.getAbsoluteCapacity() +
" [= parentAbsoluteCapacity * capacity ]" + "\n" +
"maxCapacity = " + maximumCapacity +
"maxCapacity = " + queueCapacities.getMaximumCapacity() +
" [= configuredMaxCapacity ]" + "\n" +
"absoluteMaxCapacity = " + absoluteMaxCapacity +
"absoluteMaxCapacity = " + queueCapacities.getAbsoluteMaximumCapacity() +
" [= 1.0 maximumCapacity undefined, " +
"(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" +
"\n" +
@ -282,7 +226,7 @@ protected synchronized void setupQueueConfigs(
"maxApplicationsPerUser = " + maxApplicationsPerUser +
" [= (int)(maxApplications * (userLimit / 100.0f) * " +
"userLimitFactor) ]" + "\n" +
"usedCapacity = " + usedCapacity +
"usedCapacity = " + queueCapacities.getUsedCapacity() +
" [= usedResourcesMemory / " +
"(clusterResourceMemory * absoluteCapacity)]" + "\n" +
"absoluteUsedCapacity = " + absoluteUsedCapacity +
@ -441,8 +385,8 @@ public int getNodeLocalityDelay() {
public String toString() {
return queueName + ": " +
"capacity=" + capacity + ", " +
"absoluteCapacity=" + absoluteCapacity + ", " +
"capacity=" + queueCapacities.getCapacity() + ", " +
"absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " +
"usedResources=" + queueUsage.getUsed() + ", " +
"usedCapacity=" + getUsedCapacity() + ", " +
"absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " +
@ -505,23 +449,7 @@ public synchronized void reinitialize(
+ ", trying to set it to: " + newMax);
}
setupQueueConfigs(
clusterResource,
newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity,
newlyParsedLeafQueue.maximumCapacity,
newlyParsedLeafQueue.absoluteMaxCapacity,
newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor,
newlyParsedLeafQueue.maxApplications,
newlyParsedLeafQueue.maxAMResourcePerQueuePercent,
newlyParsedLeafQueue.getMaxApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
newlyParsedLeafQueue.getNodeLocalityDelay(),
newlyParsedLeafQueue.accessibleLabels,
newlyParsedLeafQueue.defaultLabelExpression,
newlyParsedLeafQueue.capacitiyByNodeLabels,
newlyParsedLeafQueue.maxCapacityByNodeLabels,
newlyParsedLeafQueue.reservationsContinueLooking,
newlyParsedLeafQueue.getMaximumAllocation());
setupQueueConfigs(clusterResource);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
@ -1034,7 +962,8 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
application.getCurrentReservation()),
labelManager.getResourceByLabel(label, clusterResource));
if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) {
if (potentialNewWithoutReservedCapacity <= queueCapacities
.getAbsoluteMaximumCapacity()) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: "
+ getQueueName()
@ -1048,8 +977,9 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(), clusterResource) + " required " + required
+ " potentialNewWithoutReservedCapacity: "
+ potentialNewWithoutReservedCapacity + " ( " + " max-capacity: "
+ absoluteMaxCapacity + ")");
+ potentialNewWithoutReservedCapacity + " ( "
+ " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity() + ")");
}
// we could potentially use this node instead of reserved node
return true;
@ -1058,7 +988,8 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
// Otherwise, if any of the label of this node beyond queue limit, we
// cannot allocate on this node. Consider a small epsilon here.
if (potentialNewCapacity > getAbsoluteMaximumCapacityByNodeLabel(label) + 1e-4) {
if (potentialNewCapacity > queueCapacities
.getAbsoluteMaximumCapacity(label) + 1e-4) {
canAssign = false;
break;
}
@ -1073,7 +1004,8 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource))
+ " potentialNewCapacity: " + potentialNewCapacity + " ( "
+ " max-capacity: " + absoluteMaxCapacity + ")");
+ " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
+ ")");
}
}
@ -1162,7 +1094,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
Resources.multiplyAndNormalizeUp(resourceCalculator,
labelManager.getResourceByLabel(firstLabel,
clusterResource),
getAbsoluteCapacityByNodeLabel(firstLabel),
queueCapacities.getAbsoluteCapacity(firstLabel),
minimumAllocation));
} else {
// else there's no label on request, just to use absolute capacity as
@ -1170,7 +1102,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application,
queueCapacity =
Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource),
absoluteCapacity, minimumAllocation);
queueCapacities.getAbsoluteCapacity(), minimumAllocation);
}
// Allow progress for queues with miniscule capacity
@ -1815,12 +1747,9 @@ synchronized void releaseResource(Resource clusterResource,
}
private void updateAbsoluteCapacityResource(Resource clusterResource) {
absoluteCapacityResource = Resources.multiplyAndNormalizeUp(
resourceCalculator,
clusterResource,
absoluteCapacity, minimumAllocation);
absoluteCapacityResource =
Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
queueCapacities.getAbsoluteCapacity(), minimumAllocation);
}
@Override
@ -1831,7 +1760,8 @@ public synchronized void updateClusterResource(Resource clusterResource) {
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
updateHeadroomInfo(clusterResource,
queueCapacities.getAbsoluteMaximumCapacity());
// Update metrics
CSQueueUtils.updateQueueStatistics(
@ -2004,35 +1934,13 @@ public void detachContainer(Resource clusterResource,
getParent().detachContainer(clusterResource, application, rmContainer);
}
}
@Override
public float getAbsActualCapacity() {
//? Is this actually used by anything at present?
// There is a findbugs warning -re lastClusterResource (now excluded),
// when this is used, verify that the access is mt correct and remove
// the findbugs exclusion if possible
if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
lastClusterResource, Resources.none())) {
return absoluteCapacity;
}
Resource resourceRespectLabels =
labelManager == null ? lastClusterResource : labelManager
.getQueueResource(queueName, accessibleLabels, lastClusterResource);
float absActualCapacity =
Resources.divide(resourceCalculator, lastClusterResource,
resourceRespectLabels, lastClusterResource);
return absActualCapacity > absoluteCapacity ? absoluteCapacity
: absActualCapacity;
}
public void setCapacity(float capacity) {
this.capacity = capacity;
queueCapacities.setCapacity(capacity);
}
public void setAbsoluteCapacity(float absoluteCapacity) {
this.absoluteCapacity = absoluteCapacity;
queueCapacities.setAbsoluteCapacity(absoluteCapacity);
}
public void setMaxApplications(int maxApplications) {

View File

@ -87,7 +87,7 @@ public ParentQueue(CapacitySchedulerContext cs,
this.rootQueue = (parent == null);
float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
float rawCapacity = cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath());
if (rootQueue &&
(rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
@ -95,46 +95,20 @@ public ParentQueue(CapacitySchedulerContext cs,
"capacity of " + rawCapacity + " for queue " + queueName +
". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
}
float capacity = (float) rawCapacity / 100;
float parentAbsoluteCapacity =
(rootQueue) ? 1.0f : parent.getAbsoluteCapacity();
float absoluteCapacity = parentAbsoluteCapacity * capacity;
float maximumCapacity =
(float) cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
float absoluteMaxCapacity =
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
QueueState state = cs.getConfiguration().getState(getQueuePath());
Map<AccessType, AccessControlList> acls =
cs.getConfiguration().getAcls(getQueuePath());
setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
defaultLabelExpression, capacitiyByNodeLabels, maxCapacityByNodeLabels,
cs.getConfiguration().getReservationContinueLook());
this.childQueues = new TreeSet<CSQueue>(queueComparator);
setupQueueConfigs(cs.getClusterResource());
LOG.info("Initialized parent-queue " + queueName +
" name=" + queueName +
", fullname=" + getQueuePath());
}
synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map<AccessType, AccessControlList> acls,
Set<String> accessibleLabels, String defaultLabelExpression,
Map<String, Float> nodeLabelCapacities,
Map<String, Float> maximumCapacitiesByLabel,
boolean reservationContinueLooking) throws IOException {
super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel,
reservationContinueLooking, maximumAllocation);
StringBuilder aclsString = new StringBuilder();
synchronized void setupQueueConfigs(Resource clusterResource)
throws IOException {
super.setupQueueConfigs(clusterResource);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
@ -148,10 +122,10 @@ synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
}
LOG.info(queueName +
", capacity=" + capacity +
", asboluteCapacity=" + absoluteCapacity +
", maxCapacity=" + maximumCapacity +
", asboluteMaxCapacity=" + absoluteMaxCapacity +
", capacity=" + this.queueCapacities.getCapacity() +
", asboluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() +
", maxCapacity=" + this.queueCapacities.getMaximumCapacity() +
", asboluteMaxCapacity=" + this.queueCapacities.getAbsoluteMaximumCapacity() +
", state=" + state +
", acls=" + aclsString +
", labels=" + labelStrBuilder.toString() + "\n" +
@ -167,19 +141,19 @@ synchronized void setChildQueues(Collection<CSQueue> childQueues) {
}
float delta = Math.abs(1.0f - childCapacities); // crude way to check
// allow capacities being set to 0, and enforce child 0 if parent is 0
if (((capacity > 0) && (delta > PRECISION)) ||
((capacity == 0) && (childCapacities > 0))) {
if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) ||
((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
throw new IllegalArgumentException("Illegal" +
" capacity of " + childCapacities +
" for children of queue " + queueName);
}
// check label capacities
for (String nodeLabel : labelManager.getClusterNodeLabels()) {
float capacityByLabel = getCapacityByNodeLabel(nodeLabel);
float capacityByLabel = queueCapacities.getCapacity(nodeLabel);
// check children's labels
float sum = 0;
for (CSQueue queue : childQueues) {
sum += queue.getCapacityByNodeLabel(nodeLabel);
sum += queue.getQueueCapacities().getCapacity(nodeLabel);
}
if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
|| (capacityByLabel == 0) && (sum > 0)) {
@ -255,8 +229,8 @@ public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
public String toString() {
return queueName + ": " +
"numChildQueue= " + childQueues.size() + ", " +
"capacity=" + capacity + ", " +
"absoluteCapacity=" + absoluteCapacity + ", " +
"capacity=" + queueCapacities.getCapacity() + ", " +
"absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " +
"usedResources=" + queueUsage.getUsed() +
"usedCapacity=" + getUsedCapacity() + ", " +
"numApps=" + getNumApplications() + ", " +
@ -264,9 +238,8 @@ public String toString() {
}
@Override
public synchronized void reinitialize(
CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException {
public synchronized void reinitialize(CSQueue newlyParsedQueue,
Resource clusterResource) throws IOException {
// Sanity check
if (!(newlyParsedQueue instanceof ParentQueue) ||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
@ -277,18 +250,7 @@ public synchronized void reinitialize(
ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue;
// Set new configs
setupQueueConfigs(clusterResource,
newlyParsedParentQueue.capacity,
newlyParsedParentQueue.absoluteCapacity,
newlyParsedParentQueue.maximumCapacity,
newlyParsedParentQueue.absoluteMaxCapacity,
newlyParsedParentQueue.state,
newlyParsedParentQueue.acls,
newlyParsedParentQueue.accessibleLabels,
newlyParsedParentQueue.defaultLabelExpression,
newlyParsedParentQueue.capacitiyByNodeLabels,
newlyParsedParentQueue.maxCapacityByNodeLabels,
newlyParsedParentQueue.reservationsContinueLooking);
setupQueueConfigs(clusterResource);
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
@ -513,7 +475,7 @@ private synchronized boolean canAssignToThisQueue(Resource clusterResource,
labelManager.getResourceByLabel(label, clusterResource));
// if any of the label doesn't beyond limit, we can allocate on this node
if (currentAbsoluteLabelUsedCapacity >=
getAbsoluteMaximumCapacityByNodeLabel(label)) {
queueCapacities.getAbsoluteMaximumCapacity(label)) {
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
+ " current-capacity (" + queueUsage.getUsed(label) + ") "
@ -541,7 +503,8 @@ private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource)
Resources.subtract(queueUsage.getUsed(), reservedResources),
clusterResource);
if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) {
if (capacityWithoutReservedCapacity <= queueCapacities
.getAbsoluteMaximumCapacity()) {
if (LOG.isDebugEnabled()) {
LOG.debug("parent: try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed().getMemory()
@ -551,7 +514,7 @@ private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource)
/ clusterResource.getMemory()
+ " potentialNewWithoutReservedCapacity: "
+ capacityWithoutReservedCapacity + " ( " + " max-capacity: "
+ absoluteMaxCapacity + ")");
+ queueCapacities.getAbsoluteMaximumCapacity() + ")");
}
// we could potentially use this node instead of reserved node
return true;
@ -761,13 +724,6 @@ public void detachContainer(Resource clusterResource,
}
}
}
@Override
public float getAbsActualCapacity() {
// for now, simply return actual capacity = guaranteed capacity for parent
// queue
return absoluteCapacity;
}
public synchronized int getNumApplications() {
return numApplications;

View File

@ -19,8 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.Resource;
@ -99,16 +97,7 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
}
// Set new configs
setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(),
newlyParsedParentQueue.getAbsoluteCapacity(),
newlyParsedParentQueue.getMaximumCapacity(),
newlyParsedParentQueue.getAbsoluteMaximumCapacity(),
newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs(),
newlyParsedParentQueue.accessibleLabels,
newlyParsedParentQueue.defaultLabelExpression,
newlyParsedParentQueue.capacitiyByNodeLabels,
newlyParsedParentQueue.maxCapacityByNodeLabels,
newlyParsedParentQueue.getReservationContinueLooking());
setupQueueConfigs(clusterResource);
updateQuotas(newlyParsedParentQueue.userLimit,
newlyParsedParentQueue.userLimitFactor,

View File

@ -19,12 +19,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import com.google.common.collect.Sets;
public class QueueCapacities {
private static final String NL = CommonNodeLabelsManager.NO_LABEL;
@ -32,13 +38,15 @@ public class QueueCapacities {
private Map<String, Capacities> capacitiesMap;
private ReadLock readLock;
private WriteLock writeLock;
private final boolean isRoot;
public QueueCapacities() {
public QueueCapacities(boolean isRoot) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
capacitiesMap = new HashMap<String, Capacities>();
this.isRoot = isRoot;
}
// Usage enum here to make implement cleaner
@ -58,6 +66,18 @@ private static class Capacities {
public Capacities() {
capacitiesArr = new float[CapacityType.values().length];
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{used=" + capacitiesArr[0] + "%, ");
sb.append("abs_used=" + capacitiesArr[1] + "%, ");
sb.append("max_cap=" + capacitiesArr[2] + "%, ");
sb.append("abs_max_cap=" + capacitiesArr[3] + "%, ");
sb.append("cap=" + capacitiesArr[4] + "%, ");
sb.append("abs_cap=" + capacitiesArr[5] + "%}");
return sb.toString();
}
}
private float _get(String label, CapacityType type) {
@ -127,6 +147,10 @@ public float getCapacity() {
}
public float getCapacity(String label) {
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL) && isRoot) {
return 1f;
}
return _get(label, CapacityType.CAP);
}
@ -144,6 +168,9 @@ public float getAbsoluteCapacity() {
}
public float getAbsoluteCapacity(String label) {
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL) && isRoot) {
return 1f;
}
return _get(label, CapacityType.ABS_CAP);
}
@ -188,4 +215,43 @@ public void setAbsoluteMaximumCapacity(float value) {
public void setAbsoluteMaximumCapacity(String label, float value) {
_set(label, CapacityType.ABS_MAX_CAP, value);
}
/**
* Clear configurable fields, like
* (absolute)capacity/(absolute)maximum-capacity, this will be used by queue
* reinitialize, when we reinitialize a queue, we will first clear all
* configurable fields, and load new values
*/
public void clearConfigurableFields() {
try {
writeLock.lock();
for (String label : capacitiesMap.keySet()) {
_set(label, CapacityType.CAP, 0);
_set(label, CapacityType.MAX_CAP, 0);
_set(label, CapacityType.ABS_CAP, 0);
_set(label, CapacityType.ABS_MAX_CAP, 0);
}
} finally {
writeLock.unlock();
}
}
public Set<String> getExistingNodeLabels() {
try {
readLock.lock();
return new HashSet<String>(capacitiesMap.keySet());
} finally {
readLock.unlock();
}
}
@Override
public String toString() {
try {
readLock.lock();
return this.capacitiesMap.toString();
} finally {
readLock.unlock();
}
}
}

View File

@ -62,11 +62,11 @@ public synchronized void reinitialize(CSQueue newlyParsedQueue,
throw new IOException("Trying to reinitialize " + getQueuePath()
+ " from " + newlyParsedQueue.getQueuePath());
}
super.reinitialize(newlyParsedQueue, clusterResource);
CSQueueUtils.updateQueueStatistics(
parent.schedulerContext.getResourceCalculator(), newlyParsedQueue,
parent, parent.schedulerContext.getClusterResource(),
parent.schedulerContext.getMinimumResourceCapability());
super.reinitialize(newlyParsedQueue, clusterResource);
updateQuotas(parent.getUserLimitForReservation(),
parent.getUserLimitFactor(),
parent.getMaxApplicationsForReservations(),
@ -108,9 +108,9 @@ private void updateQuotas(int userLimit, float userLimitFactor,
maxApplicationsPerUser = maxAppsPerUserForReservation;
}
// used by the super constructor, we initialize to zero
protected float getCapacityFromConf() {
return 0f;
@Override
protected void setupConfigurableCapacities() {
CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(),
queueCapacities, parent == null ? null : parent.getQueueCapacities());
}
}

View File

@ -202,33 +202,33 @@ public void testAbsoluteMaxAvailCapacityWithUse() throws Exception {
LOG.info("t2 l2q2 " + result);
//some usage, but below the base capacity
root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.4f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//usage gt base on parent sibling
root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//same as last, but with usage also on direct parent
root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//add to direct sibling, below the threshold of effect at present
root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f);
@ -236,9 +236,9 @@ public void testAbsoluteMaxAvailCapacityWithUse() throws Exception {
//add to direct sibling, now above the threshold of effect
//(it's cumulative with prior tests)
root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.1f, result, 0.000001f);

View File

@ -389,11 +389,11 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
public void testMaximumCapacitySetup() {
float delta = 0.0000001f;
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getMaximumCapacity(A),delta);
assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getNonLabeledQueueMaximumCapacity(A),delta);
conf.setMaximumCapacity(A, 50.0f);
assertEquals(50.0f, conf.getMaximumCapacity(A),delta);
assertEquals(50.0f, conf.getNonLabeledQueueMaximumCapacity(A),delta);
conf.setMaximumCapacity(A, -1);
assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getMaximumCapacity(A),delta);
assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getNonLabeledQueueMaximumCapacity(A),delta);
}

View File

@ -91,7 +91,8 @@ private void checkUsedResource(MockRM rm, String queueName, int memory,
String label) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queueName);
Assert.assertEquals(memory, queue.getUsedResourceByLabel(label).getMemory());
Assert.assertEquals(memory, queue.getQueueResourceUsage().getUsed(label)
.getMemory());
}
@Test (timeout = 30000)

View File

@ -99,7 +99,7 @@ private static float executeByName(QueueCapacities obj, String methodName,
}
private void internalTestModifyAndRead(String label) throws Exception {
QueueCapacities qc = new QueueCapacities();
QueueCapacities qc = new QueueCapacities(false);
// First get returns 0 always
Assert.assertEquals(0f, get(qc, suffix, label), 1e-8);

View File

@ -468,18 +468,18 @@ private void checkQueueLabels(CapacityScheduler capacityScheduler) {
// check capacity of A2
CSQueue qA2 = capacityScheduler.getQueue("a2");
Assert.assertEquals(0.7, qA2.getCapacity(), DELTA);
Assert.assertEquals(0.5, qA2.getCapacityByNodeLabel("red"), DELTA);
Assert.assertEquals(0.5, qA2.getQueueCapacities().getCapacity("red"), DELTA);
Assert.assertEquals(0.07, qA2.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.25, qA2.getAbsoluteCapacityByNodeLabel("red"), DELTA);
Assert.assertEquals(0.25, qA2.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(0.3, qA2.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA);
Assert.assertEquals(0.3, qA2.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
// check capacity of B3
CSQueue qB3 = capacityScheduler.getQueue("b3");
Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA);
Assert.assertEquals(0.125, qB3.getAbsoluteCapacityByNodeLabel("red"), DELTA);
Assert.assertEquals(0.125, qB3.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA);
Assert.assertEquals(1, qB3.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA);
Assert.assertEquals(1, qB3.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
}
private void
@ -691,8 +691,8 @@ public void testQueueParsingWithUnusedLabels() throws IOException {
// check root queue's capacity by label -- they should be all zero
CSQueue root = capacityScheduler.getQueue(CapacitySchedulerConfiguration.ROOT);
Assert.assertEquals(0, root.getCapacityByNodeLabel("red"), DELTA);
Assert.assertEquals(0, root.getCapacityByNodeLabel("blue"), DELTA);
Assert.assertEquals(0, root.getQueueCapacities().getCapacity("red"), DELTA);
Assert.assertEquals(0, root.getQueueCapacities().getCapacity("blue"), DELTA);
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);

View File

@ -395,9 +395,9 @@ private void verifySubQueueGeneric(String q, QueueInfo info,
String qshortName = qArr[qArr.length - 1];
assertEquals("usedCapacity doesn't match", 0, info.usedCapacity, 1e-3f);
assertEquals("capacity doesn't match", csConf.getCapacity(q),
assertEquals("capacity doesn't match", csConf.getNonLabeledQueueCapacity(q),
info.capacity, 1e-3f);
float expectCapacity = csConf.getMaximumCapacity(q);
float expectCapacity = csConf.getNonLabeledQueueMaximumCapacity(q);
float expectAbsMaxCapacity = parentAbsMaxCapacity * (info.maxCapacity/100);
if (CapacitySchedulerConfiguration.UNDEFINED == expectCapacity) {
expectCapacity = 100;