diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2c654dc9bca..a6bbdf38146 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -189,6 +189,9 @@ Release 2.6.1 - UNRELEASED
YARN-2694. Ensure only single node label specified in ResourceRequest.
(Wangda Tan via jianhe)
+ YARN-3124. Fixed CS LeafQueue/ParentQueue to use QueueCapacities to track
+ capacities-by-label. (Wangda Tan via jianhe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 971acea7da6..ece5cff81d3 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -204,6 +204,8 @@
+
+
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 3c1663fbb6b..52629b3a1f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -39,25 +39,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue {
CSQueue parent;
final String queueName;
-
- float capacity;
- float maximumCapacity;
- float absoluteCapacity;
- float absoluteMaxCapacity;
- float absoluteUsedCapacity = 0.0f;
- float usedCapacity = 0.0f;
volatile int numContainers;
- final Resource minimumAllocation;
- final Resource maximumAllocation;
+ Resource minimumAllocation;
+ Resource maximumAllocation;
QueueState state;
final QueueMetrics metrics;
@@ -65,10 +57,6 @@ public abstract class AbstractCSQueue implements CSQueue {
Set accessibleLabels;
RMNodeLabelsManager labelManager;
String defaultLabelExpression;
- Map absoluteCapacityByNodeLabels;
- Map capacitiyByNodeLabels;
- Map absoluteMaxCapacityByNodeLabels;
- Map maxCapacityByNodeLabels;
Map acls =
new HashMap();
@@ -77,13 +65,16 @@ public abstract class AbstractCSQueue implements CSQueue {
// Track resource usage-by-label like used-resource/pending-resource, etc.
ResourceUsage queueUsage;
+ // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity,
+ // etc.
+ QueueCapacities queueCapacities;
+
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
+ protected CapacitySchedulerContext csContext;
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
- this.minimumAllocation = cs.getMinimumResourceCapability();
- this.maximumAllocation = cs.getMaximumResourceCapability();
this.labelManager = cs.getRMContext().getNodeLabelManager();
this.parent = parent;
this.queueName = queueName;
@@ -94,65 +85,53 @@ public abstract class AbstractCSQueue implements CSQueue {
QueueMetrics.forQueue(getQueuePath(), parent,
cs.getConfiguration().getEnableUserMetrics(),
cs.getConf());
-
- // get labels
- this.accessibleLabels = cs.getConfiguration().getAccessibleNodeLabels(getQueuePath());
- this.defaultLabelExpression = cs.getConfiguration()
- .getDefaultNodeLabelExpression(getQueuePath());
+ this.csContext = cs;
- // inherit from parent if labels not set
- if (this.accessibleLabels == null && parent != null) {
- this.accessibleLabels = parent.getAccessibleNodeLabels();
- }
- SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
- this.accessibleLabels);
-
- // inherit from parent if labels not set
- if (this.defaultLabelExpression == null && parent != null
- && this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
- this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
- }
-
- // set capacity by labels
- capacitiyByNodeLabels =
- cs.getConfiguration().getNodeLabelCapacities(getQueuePath(), accessibleLabels,
- labelManager);
-
- // set maximum capacity by labels
- maxCapacityByNodeLabels =
- cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(),
- accessibleLabels, labelManager);
+ // initialize ResourceUsage
queueUsage = new ResourceUsage();
+
+ // initialize QueueCapacities
+ queueCapacities = new QueueCapacities(parent == null);
+ }
+
+ protected void setupConfigurableCapacities() {
+ CSQueueUtils.loadUpdateAndCheckCapacities(
+ getQueuePath(),
+ accessibleLabels,
+ csContext.getConfiguration(),
+ queueCapacities,
+ parent == null ? null : parent.getQueueCapacities(),
+ csContext.getRMContext().getNodeLabelManager());
}
@Override
public synchronized float getCapacity() {
- return capacity;
+ return queueCapacities.getCapacity();
}
@Override
public synchronized float getAbsoluteCapacity() {
- return absoluteCapacity;
+ return queueCapacities.getAbsoluteCapacity();
}
@Override
public float getAbsoluteMaximumCapacity() {
- return absoluteMaxCapacity;
+ return queueCapacities.getAbsoluteMaximumCapacity();
}
@Override
public synchronized float getAbsoluteUsedCapacity() {
- return absoluteUsedCapacity;
+ return queueCapacities.getAbsoluteUsedCapacity();
}
@Override
public float getMaximumCapacity() {
- return maximumCapacity;
+ return queueCapacities.getMaximumCapacity();
}
@Override
public synchronized float getUsedCapacity() {
- return usedCapacity;
+ return queueCapacities.getUsedCapacity();
}
@Override
@@ -210,12 +189,12 @@ public abstract class AbstractCSQueue implements CSQueue {
@Override
public synchronized void setUsedCapacity(float usedCapacity) {
- this.usedCapacity = usedCapacity;
+ queueCapacities.setUsedCapacity(usedCapacity);
}
@Override
public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
- this.absoluteUsedCapacity = absUsedCapacity;
+ queueCapacities.setAbsoluteUsedCapacity(absUsedCapacity);
}
/**
@@ -224,21 +203,16 @@ public abstract class AbstractCSQueue implements CSQueue {
*/
synchronized void setMaxCapacity(float maximumCapacity) {
// Sanity check
- CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
+ CSQueueUtils.checkMaxCapacity(getQueueName(),
+ queueCapacities.getCapacity(), maximumCapacity);
float absMaxCapacity =
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
- CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity,
+ CSQueueUtils.checkAbsoluteCapacity(getQueueName(),
+ queueCapacities.getAbsoluteCapacity(),
absMaxCapacity);
- this.maximumCapacity = maximumCapacity;
- this.absoluteMaxCapacity = absMaxCapacity;
- }
-
- @Override
- public float getAbsActualCapacity() {
- // for now, simply return actual capacity = guaranteed capacity for parent
- // queue
- return absoluteCapacity;
+ queueCapacities.setMaximumCapacity(maximumCapacity);
+ queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity);
}
@Override
@@ -246,39 +220,35 @@ public abstract class AbstractCSQueue implements CSQueue {
return defaultLabelExpression;
}
- synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
- float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
- QueueState state, Map acls,
- Set labels, String defaultLabelExpression,
- Map nodeLabelCapacities,
- Map maximumNodeLabelCapacities,
- boolean reservationContinueLooking)
+ synchronized void setupQueueConfigs(Resource clusterResource)
throws IOException {
- // Sanity check
- CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
- CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity,
- absoluteMaxCapacity);
+ // get labels
+ this.accessibleLabels =
+ csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath());
+ this.defaultLabelExpression = csContext.getConfiguration()
+ .getDefaultNodeLabelExpression(getQueuePath());
- this.capacity = capacity;
- this.absoluteCapacity = absoluteCapacity;
-
- this.maximumCapacity = maximumCapacity;
- this.absoluteMaxCapacity = absoluteMaxCapacity;
-
- this.state = state;
-
- this.acls = acls;
+ // inherit from parent if labels not set
+ if (this.accessibleLabels == null && parent != null) {
+ this.accessibleLabels = parent.getAccessibleNodeLabels();
+ }
+ SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
+ this.accessibleLabels);
- // set labels
- this.accessibleLabels = labels;
+ // inherit from parent if labels not set
+ if (this.defaultLabelExpression == null && parent != null
+ && this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
+ this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
+ }
+
+ // After we setup labels, we can setup capacities
+ setupConfigurableCapacities();
- // set label expression
- this.defaultLabelExpression = defaultLabelExpression;
+ this.minimumAllocation = csContext.getMinimumResourceCapability();
+ this.maximumAllocation = csContext.getMaximumResourceCapability();
- // copy node label capacity
- this.capacitiyByNodeLabels = new HashMap(nodeLabelCapacities);
- this.maxCapacityByNodeLabels =
- new HashMap(maximumNodeLabelCapacities);
+ this.state = csContext.getConfiguration().getState(getQueuePath());
+ this.acls = csContext.getConfiguration().getAcls(getQueuePath());
// Update metrics
CSQueueUtils.updateQueueStatistics(
@@ -305,30 +275,18 @@ public abstract class AbstractCSQueue implements CSQueue {
}
}
}
-
- // calculate absolute capacity by each node label
- this.absoluteCapacityByNodeLabels =
- CSQueueUtils.computeAbsoluteCapacityByNodeLabels(
- this.capacitiyByNodeLabels, parent);
-
- // calculate maximum capacity by each node label
- this.absoluteMaxCapacityByNodeLabels =
- CSQueueUtils.computeAbsoluteMaxCapacityByNodeLabels(
- maximumNodeLabelCapacities, parent);
-
- // check absoluteMaximumNodeLabelCapacities is valid
- CSQueueUtils.checkAbsoluteCapacitiesByLabel(getQueueName(),
- absoluteCapacityByNodeLabels, absoluteCapacityByNodeLabels);
-
- this.reservationsContinueLooking = reservationContinueLooking;
+
+ this.reservationsContinueLooking = csContext.getConfiguration()
+ .getReservationContinueLook();
+
}
protected QueueInfo getQueueInfo() {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(queueName);
queueInfo.setAccessibleNodeLabels(accessibleLabels);
- queueInfo.setCapacity(capacity);
- queueInfo.setMaximumCapacity(maximumCapacity);
+ queueInfo.setCapacity(queueCapacities.getCapacity());
+ queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
queueInfo.setQueueState(state);
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
queueInfo.setCurrentCapacity(getUsedCapacity());
@@ -378,51 +336,6 @@ public abstract class AbstractCSQueue implements CSQueue {
--numContainers;
}
- @Private
- public float getCapacityByNodeLabel(String label) {
- if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
- if (null == parent) {
- return 1f;
- }
- return getCapacity();
- }
-
- if (!capacitiyByNodeLabels.containsKey(label)) {
- return 0f;
- } else {
- return capacitiyByNodeLabels.get(label);
- }
- }
-
- @Private
- public float getAbsoluteCapacityByNodeLabel(String label) {
- if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
- if (null == parent) {
- return 1f;
- }
- return getAbsoluteCapacity();
- }
-
- if (!absoluteCapacityByNodeLabels.containsKey(label)) {
- return 0f;
- } else {
- return absoluteCapacityByNodeLabels.get(label);
- }
- }
-
- @Private
- public float getAbsoluteMaximumCapacityByNodeLabel(String label) {
- if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
- return getAbsoluteMaximumCapacity();
- }
-
- if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
- return 0f;
- } else {
- return absoluteMaxCapacityByNodeLabels.get(label);
- }
- }
-
@Private
public boolean getReservationContinueLooking() {
return reservationsContinueLooking;
@@ -432,14 +345,14 @@ public abstract class AbstractCSQueue implements CSQueue {
public Map getACLs() {
return acls;
}
-
+
@Private
- public Resource getUsedResourceByLabel(String nodeLabel) {
- return queueUsage.getUsed(nodeLabel);
+ public QueueCapacities getQueueCapacities() {
+ return queueCapacities;
}
- @VisibleForTesting
- public ResourceUsage getResourceUsage() {
+ @Private
+ public ResourceUsage getQueueResourceUsage() {
return queueUsage;
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 07a7e0e9a2f..1dfb87218a8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -75,15 +76,6 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* @return configured queue capacity
*/
public float getCapacity();
-
- /**
- * Get actual capacity of the queue, this may be different from
- * configured capacity when mis-config take place, like add labels to the
- * cluster
- *
- * @return actual queue capacity
- */
- public float getAbsActualCapacity();
/**
* Get capacity of the parent of the queue as a function of the
@@ -143,14 +135,6 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public Resource getUsedResources();
- /**
- * Get the currently utilized resources which allocated at nodes with label
- * specified
- *
- * @return used resources by the queue and it's children
- */
- public Resource getUsedResourceByLabel(String nodeLabel);
-
/**
* Get the current run-state of the queue
* @return current run-state
@@ -279,25 +263,16 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
*/
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer container);
-
- /**
- * Get absolute capacity by label of this queue can use
- * @param nodeLabel
- * @return absolute capacity by label of this queue can use
- */
- public float getAbsoluteCapacityByNodeLabel(String nodeLabel);
-
- /**
- * Get absolute max capacity by label of this queue can use
- * @param nodeLabel
- * @return absolute capacity by label of this queue can use
- */
- public float getAbsoluteMaximumCapacityByNodeLabel(String nodeLabel);
/**
- * Get capacity by node label
- * @param nodeLabel
- * @return capacity by node label
+ * Get QueueCapacities of this queue
+ * @return queueCapacities
*/
- public float getCapacityByNodeLabel(String nodeLabel);
+ public QueueCapacities getQueueCapacities();
+
+ /**
+ * Get ResourceUsage of this queue
+ * @return resourceUsage
+ */
+ public ResourceUsage getQueueResourceUsage();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index f458057c35d..865b0b41979 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -17,13 +17,14 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -34,6 +35,9 @@ class CSQueueUtils {
final static float EPSILON = 0.0001f;
+ /*
+ * Used only by tests
+ */
public static void checkMaxCapacity(String queueName,
float capacity, float maximumCapacity) {
if (maximumCapacity < 0.0f || maximumCapacity > 1.0f) {
@@ -43,6 +47,9 @@ class CSQueueUtils {
}
}
+ /*
+ * Used only by tests
+ */
public static void checkAbsoluteCapacity(String queueName,
float absCapacity, float absMaxCapacity) {
if (absMaxCapacity < (absCapacity - EPSILON)) {
@@ -53,19 +60,33 @@ class CSQueueUtils {
}
}
- public static void checkAbsoluteCapacitiesByLabel(String queueName,
- Map absCapacities,
- Map absMaximumCapacities) {
- for (Entry entry : absCapacities.entrySet()) {
- String label = entry.getKey();
- float absCapacity = entry.getValue();
- float absMaxCapacity = absMaximumCapacities.get(label);
- if (absMaxCapacity < (absCapacity - EPSILON)) {
- throw new IllegalArgumentException("Illegal call to setMaxCapacity. "
- + "Queue '" + queueName + "' has " + "an absolute capacity ("
- + absCapacity + ") greater than "
- + "its absolute maximumCapacity (" + absMaxCapacity + ") of label="
- + label);
+ /**
+ * Check sanity of capacities:
+ * - capacity <= maxCapacity
+ * - absCapacity <= absMaximumCapacity
+ */
+ private static void capacitiesSanityCheck(String queueName,
+ QueueCapacities queueCapacities) {
+ for (String label : queueCapacities.getExistingNodeLabels()) {
+ float capacity = queueCapacities.getCapacity(label);
+ float maximumCapacity = queueCapacities.getMaximumCapacity(label);
+ if (capacity > maximumCapacity) {
+ throw new IllegalArgumentException("Illegal queue capacity setting, "
+ + "(capacity=" + capacity + ") > (maximum-capacity="
+ + maximumCapacity + "). When label=[" + label + "]");
+ }
+
+ // Actually, this may not needed since we have verified capacity <=
+ // maximumCapacity. And the way we compute absolute capacity (abs(x) =
+ // cap(x) * cap(x.parent) * ...) is a monotone increasing function. But
+ // just keep it here to make sure our compute abs capacity method works
+ // correctly.
+ float absCapacity = queueCapacities.getAbsoluteCapacity(label);
+ float absMaxCapacity = queueCapacities.getAbsoluteMaximumCapacity(label);
+ if (absCapacity > absMaxCapacity) {
+ throw new IllegalArgumentException("Illegal queue capacity setting, "
+ + "(abs-capacity=" + absCapacity + ") > (abs-maximum-capacity="
+ + absMaxCapacity + "). When label=[" + label + "]");
}
}
}
@@ -77,37 +98,94 @@ class CSQueueUtils {
return (parentAbsMaxCapacity * maximumCapacity);
}
- public static Map computeAbsoluteCapacityByNodeLabels(
- Map nodeLabelToCapacities, CSQueue parent) {
- if (parent == null) {
- return nodeLabelToCapacities;
- }
-
- Map absoluteCapacityByNodeLabels =
- new HashMap();
- for (Entry entry : nodeLabelToCapacities.entrySet()) {
- String label = entry.getKey();
- float capacity = entry.getValue();
- absoluteCapacityByNodeLabels.put(label,
- capacity * parent.getAbsoluteCapacityByNodeLabel(label));
- }
- return absoluteCapacityByNodeLabels;
+ /**
+ * This method intends to be used by ReservationQueue, ReservationQueue will
+ * not appear in configuration file, so we shouldn't do load capacities
+ * settings in configuration for reservation queue.
+ */
+ public static void updateAndCheckCapacitiesByLabel(String queuePath,
+ QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
+ updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities);
+
+ capacitiesSanityCheck(queuePath, queueCapacities);
+ }
+
+ /**
+ * Do following steps for capacities
+ * - Load capacities from configuration
+ * - Update absolute capacities for new capacities
+ * - Check if capacities/absolute-capacities legal
+ */
+ public static void loadUpdateAndCheckCapacities(String queuePath,
+ Set accessibleLabels, CapacitySchedulerConfiguration csConf,
+ QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities,
+ RMNodeLabelsManager nlm) {
+ loadCapacitiesByLabelsFromConf(queuePath, accessibleLabels, nlm,
+ queueCapacities, csConf);
+
+ updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities);
+
+ capacitiesSanityCheck(queuePath, queueCapacities);
}
- public static Map computeAbsoluteMaxCapacityByNodeLabels(
- Map maximumNodeLabelToCapacities, CSQueue parent) {
- if (parent == null) {
- return maximumNodeLabelToCapacities;
+ // Considered NO_LABEL, ANY and null cases
+ private static Set normalizeAccessibleNodeLabels(Set labels,
+ RMNodeLabelsManager mgr) {
+ Set accessibleLabels = new HashSet();
+ if (labels != null) {
+ accessibleLabels.addAll(labels);
}
- Map absoluteMaxCapacityByNodeLabels =
- new HashMap();
- for (Entry entry : maximumNodeLabelToCapacities.entrySet()) {
- String label = entry.getKey();
- float maxCapacity = entry.getValue();
- absoluteMaxCapacityByNodeLabels.put(label,
- maxCapacity * parent.getAbsoluteMaximumCapacityByNodeLabel(label));
+ if (accessibleLabels.contains(CommonNodeLabelsManager.ANY)) {
+ accessibleLabels.addAll(mgr.getClusterNodeLabels());
+ }
+ accessibleLabels.add(CommonNodeLabelsManager.NO_LABEL);
+
+ return accessibleLabels;
+ }
+
+ private static void loadCapacitiesByLabelsFromConf(String queuePath,
+ Set labels, RMNodeLabelsManager mgr,
+ QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) {
+ queueCapacities.clearConfigurableFields();
+ labels = normalizeAccessibleNodeLabels(labels, mgr);
+
+ for (String label : labels) {
+ if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
+ queueCapacities.setCapacity(CommonNodeLabelsManager.NO_LABEL,
+ csConf.getNonLabeledQueueCapacity(queuePath) / 100);
+ queueCapacities.setMaximumCapacity(CommonNodeLabelsManager.NO_LABEL,
+ csConf.getNonLabeledQueueMaximumCapacity(queuePath) / 100);
+ } else {
+ queueCapacities.setCapacity(label,
+ csConf.getLabeledQueueCapacity(queuePath, label) / 100);
+ queueCapacities.setMaximumCapacity(label,
+ csConf.getLabeledQueueMaximumCapacity(queuePath, label) / 100);
+ }
+ }
+ }
+
+ // Set absolute capacities for {capacity, maximum-capacity}
+ private static void updateAbsoluteCapacitiesByNodeLabels(
+ QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
+ for (String label : queueCapacities.getExistingNodeLabels()) {
+ float capacity = queueCapacities.getCapacity(label);
+ if (capacity > 0f) {
+ queueCapacities.setAbsoluteCapacity(
+ label,
+ capacity
+ * (parentQueueCapacities == null ? 1 : parentQueueCapacities
+ .getAbsoluteCapacity(label)));
+ }
+
+ float maxCapacity = queueCapacities.getMaximumCapacity(label);
+ if (maxCapacity > 0f) {
+ queueCapacities.setAbsoluteMaximumCapacity(
+ label,
+ maxCapacity
+ * (parentQueueCapacities == null ? 1 : parentQueueCapacities
+ .getAbsoluteMaximumCapacity(label)));
+ }
}
- return absoluteMaxCapacityByNodeLabels;
}
@Lock(CSQueue.class)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 23bf3814fa8..f5e86b37e5e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -287,6 +287,9 @@ public class CapacitySchedulerConfiguration extends Configuration {
}
private String getNodeLabelPrefix(String queue, String label) {
+ if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
+ return getQueuePrefix(queue);
+ }
return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT;
}
@@ -325,7 +328,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
getMaximumApplicationMasterResourcePercent());
}
- public float getCapacity(String queue) {
+ public float getNonLabeledQueueCapacity(String queue) {
float capacity = queue.equals("root") ? 100.0f : getFloat(
getQueuePrefix(queue) + CAPACITY, UNDEFINED);
if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
@@ -347,7 +350,7 @@ public class CapacitySchedulerConfiguration extends Configuration {
", capacity=" + capacity);
}
- public float getMaximumCapacity(String queue) {
+ public float getNonLabeledQueueMaximumCapacity(String queue) {
float maxCapacity = getFloat(getQueuePrefix(queue) + MAXIMUM_CAPACITY,
MAXIMUM_CAPACITY_VALUE);
maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ?
@@ -451,57 +454,29 @@ public class CapacitySchedulerConfiguration extends Configuration {
return Collections.unmodifiableSet(set);
}
- public Map getNodeLabelCapacities(String queue,
- Set labels, RMNodeLabelsManager mgr) {
- Map nodeLabelCapacities = new HashMap();
-
- if (labels == null) {
- return nodeLabelCapacities;
+ private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
+ float defaultValue) {
+ String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
+ float capacity = getFloat(capacityPropertyName, defaultValue);
+ if (capacity < MINIMUM_CAPACITY_VALUE
+ || capacity > MAXIMUM_CAPACITY_VALUE) {
+ throw new IllegalArgumentException("Illegal capacity of " + capacity
+ + " for node-label=" + label + " in queue=" + queue
+ + ", valid capacity should in range of [0, 100].");
}
-
- for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
- .getClusterNodeLabels() : labels) {
- String capacityPropertyName = getNodeLabelPrefix(queue, label) + CAPACITY;
- float capacity = getFloat(capacityPropertyName, 0f);
- if (capacity < MINIMUM_CAPACITY_VALUE
- || capacity > MAXIMUM_CAPACITY_VALUE) {
- throw new IllegalArgumentException("Illegal capacity of " + capacity
- + " for node-label=" + label + " in queue=" + queue
- + ", valid capacity should in range of [0, 100].");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("CSConf - getCapacityOfLabel: prefix="
- + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity);
- }
-
- nodeLabelCapacities.put(label, capacity / 100f);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CSConf - getCapacityOfLabel: prefix="
+ + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity);
}
- return nodeLabelCapacities;
+ return capacity;
}
- public Map getMaximumNodeLabelCapacities(String queue,
- Set labels, RMNodeLabelsManager mgr) {
- Map maximumNodeLabelCapacities = new HashMap();
- if (labels == null) {
- return maximumNodeLabelCapacities;
- }
-
- for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
- .getClusterNodeLabels() : labels) {
- float maxCapacity =
- getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
- 100f);
- if (maxCapacity < MINIMUM_CAPACITY_VALUE
- || maxCapacity > MAXIMUM_CAPACITY_VALUE) {
- throw new IllegalArgumentException("Illegal " + "capacity of "
- + maxCapacity + " for label=" + label + " in queue=" + queue);
- }
- LOG.debug("CSConf - getCapacityOfLabel: prefix="
- + getNodeLabelPrefix(queue, label) + ", capacity=" + maxCapacity);
-
- maximumNodeLabelCapacities.put(label, maxCapacity / 100f);
- }
- return maximumNodeLabelCapacities;
+ public float getLabeledQueueCapacity(String queue, String label) {
+ return internalGetLabeledQueueCapacity(queue, label, CAPACITY, 0f);
+ }
+
+ public float getLabeledQueueMaximumCapacity(String queue, String label) {
+ return internalGetLabeledQueueCapacity(queue, label, MAXIMUM_CAPACITY, 100f);
}
public String getDefaultNodeLabelExpression(String queue) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 6fbc8e45d44..f9f9ca92ae0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -97,7 +97,7 @@ public class LeafQueue extends AbstractCSQueue {
Set pendingApplications;
- private final float minimumAllocationFactor;
+ private float minimumAllocationFactor;
private Map users = new HashMap();
@@ -123,53 +123,7 @@ public class LeafQueue extends AbstractCSQueue {
super(cs, queueName, parent, old);
this.scheduler = cs;
- this.activeUsersManager = new ActiveUsersManager(metrics);
- this.minimumAllocationFactor =
- Resources.ratio(resourceCalculator,
- Resources.subtract(maximumAllocation, minimumAllocation),
- maximumAllocation);
-
- float capacity = getCapacityFromConf();
- float absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
-
- float maximumCapacity =
- (float)cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
- float absoluteMaxCapacity =
- CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
-
- // Initially set to absoluteMax, will be updated to more accurate
- // max avail value during assignContainers
- absoluteMaxAvailCapacity = absoluteMaxCapacity;
-
- int userLimit = cs.getConfiguration().getUserLimit(getQueuePath());
- float userLimitFactor =
- cs.getConfiguration().getUserLimitFactor(getQueuePath());
-
- int maxApplications =
- cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
- if (maxApplications < 0) {
- int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
- maxApplications = (int)(maxSystemApps * absoluteCapacity);
- }
- maxApplicationsPerUser =
- (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
-
- float maxAMResourcePerQueuePercent = cs.getConfiguration()
- .getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
-
- QueueState state = cs.getConfiguration().getState(getQueuePath());
-
- Map acls =
- cs.getConfiguration().getAcls(getQueuePath());
-
- setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
- maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
- state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
- defaultLabelExpression, this.capacitiyByNodeLabels,
- this.maxCapacityByNodeLabels,
- cs.getConfiguration().getReservationContinueLook());
-
+ this.activeUsersManager = new ActiveUsersManager(metrics);
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath());
@@ -180,34 +134,13 @@ public class LeafQueue extends AbstractCSQueue {
this.pendingApplications =
new TreeSet(applicationComparator);
this.activeApplications = new TreeSet(applicationComparator);
- }
-
- // externalizing in method, to allow overriding
- protected float getCapacityFromConf() {
- return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100;
+
+ setupQueueConfigs(cs.getClusterResource());
}
- protected synchronized void setupQueueConfigs(
- Resource clusterResource,
- float capacity, float absoluteCapacity,
- float maximumCapacity, float absoluteMaxCapacity,
- int userLimit, float userLimitFactor,
- int maxApplications, float maxAMResourcePerQueuePercent,
- int maxApplicationsPerUser, QueueState state,
- Map acls, int nodeLocalityDelay,
- Set labels, String defaultLabelExpression,
- Map capacitieByLabel,
- Map maximumCapacitiesByLabel,
- boolean revervationContinueLooking) throws IOException {
- super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity, state, acls, labels,
- defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel,
- revervationContinueLooking);
- // Sanity check
- CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
- float absCapacity = getParent().getAbsoluteCapacity() * capacity;
- CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity,
- absoluteMaxCapacity);
+ protected synchronized void setupQueueConfigs(Resource clusterResource)
+ throws IOException {
+ super.setupQueueConfigs(clusterResource);
this.lastClusterResource = clusterResource;
updateAbsoluteCapacityResource(clusterResource);
@@ -217,16 +150,28 @@ public class LeafQueue extends AbstractCSQueue {
// and all queues may not be realized yet, we'll use (optimistic)
// absoluteMaxCapacity (it will be replaced with the more accurate
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
- updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
+ updateHeadroomInfo(clusterResource,
+ queueCapacities.getAbsoluteMaximumCapacity());
- this.absoluteCapacity = absCapacity;
+ CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+ userLimit = conf.getUserLimit(getQueuePath());
+ userLimitFactor = conf.getUserLimitFactor(getQueuePath());
- this.userLimit = userLimit;
- this.userLimitFactor = userLimitFactor;
+ // Initially set to absoluteMax, will be updated to more accurate
+ // max avail value during assignContainers
+ absoluteMaxAvailCapacity = queueCapacities.getAbsoluteMaximumCapacity();
- this.maxApplications = maxApplications;
- this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
- this.maxApplicationsPerUser = maxApplicationsPerUser;
+ maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
+ if (maxApplications < 0) {
+ int maxSystemApps = conf.getMaximumSystemApplications();
+ maxApplications =
+ (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
+ }
+ maxApplicationsPerUser =
+ (int)(maxApplications * (userLimit / 100.0f) * userLimitFactor);
+
+ maxAMResourcePerQueuePercent =
+ conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
this.defaultLabelExpression)) {
@@ -242,7 +187,12 @@ public class LeafQueue extends AbstractCSQueue {
getAccessibleNodeLabels().iterator(), ',')));
}
- this.nodeLocalityDelay = nodeLocalityDelay;
+ nodeLocalityDelay = conf.getNodeLocalityDelay();
+
+ this.minimumAllocationFactor =
+ Resources.ratio(resourceCalculator,
+ Resources.subtract(maximumAllocation, minimumAllocation),
+ maximumAllocation);
StringBuilder aclsString = new StringBuilder();
for (Map.Entry e : acls.entrySet()) {
@@ -250,21 +200,21 @@ public class LeafQueue extends AbstractCSQueue {
}
StringBuilder labelStrBuilder = new StringBuilder();
- if (labels != null) {
- for (String s : labels) {
+ if (accessibleLabels != null) {
+ for (String s : accessibleLabels) {
labelStrBuilder.append(s);
labelStrBuilder.append(",");
}
}
LOG.info("Initializing " + queueName + "\n" +
- "capacity = " + capacity +
+ "capacity = " + queueCapacities.getCapacity() +
" [= (float) configuredCapacity / 100 ]" + "\n" +
- "asboluteCapacity = " + absoluteCapacity +
+ "asboluteCapacity = " + queueCapacities.getAbsoluteCapacity() +
" [= parentAbsoluteCapacity * capacity ]" + "\n" +
- "maxCapacity = " + maximumCapacity +
+ "maxCapacity = " + queueCapacities.getMaximumCapacity() +
" [= configuredMaxCapacity ]" + "\n" +
- "absoluteMaxCapacity = " + absoluteMaxCapacity +
+ "absoluteMaxCapacity = " + queueCapacities.getAbsoluteMaximumCapacity() +
" [= 1.0 maximumCapacity undefined, " +
"(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" +
"\n" +
@@ -279,7 +229,7 @@ public class LeafQueue extends AbstractCSQueue {
"maxApplicationsPerUser = " + maxApplicationsPerUser +
" [= (int)(maxApplications * (userLimit / 100.0f) * " +
"userLimitFactor) ]" + "\n" +
- "usedCapacity = " + usedCapacity +
+ "usedCapacity = " + queueCapacities.getUsedCapacity() +
" [= usedResourcesMemory / " +
"(clusterResourceMemory * absoluteCapacity)]" + "\n" +
"absoluteUsedCapacity = " + absoluteUsedCapacity +
@@ -435,8 +385,8 @@ public class LeafQueue extends AbstractCSQueue {
public String toString() {
return queueName + ": " +
- "capacity=" + capacity + ", " +
- "absoluteCapacity=" + absoluteCapacity + ", " +
+ "capacity=" + queueCapacities.getCapacity() + ", " +
+ "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " +
"usedResources=" + queueUsage.getUsed() + ", " +
"usedCapacity=" + getUsedCapacity() + ", " +
"absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " +
@@ -483,23 +433,7 @@ public class LeafQueue extends AbstractCSQueue {
" from " + newlyParsedQueue.getQueuePath());
}
- LeafQueue newlyParsedLeafQueue = (LeafQueue)newlyParsedQueue;
- setupQueueConfigs(
- clusterResource,
- newlyParsedLeafQueue.capacity, newlyParsedLeafQueue.absoluteCapacity,
- newlyParsedLeafQueue.maximumCapacity,
- newlyParsedLeafQueue.absoluteMaxCapacity,
- newlyParsedLeafQueue.userLimit, newlyParsedLeafQueue.userLimitFactor,
- newlyParsedLeafQueue.maxApplications,
- newlyParsedLeafQueue.maxAMResourcePerQueuePercent,
- newlyParsedLeafQueue.getMaxApplicationsPerUser(),
- newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
- newlyParsedLeafQueue.getNodeLocalityDelay(),
- newlyParsedLeafQueue.accessibleLabels,
- newlyParsedLeafQueue.defaultLabelExpression,
- newlyParsedLeafQueue.capacitiyByNodeLabels,
- newlyParsedLeafQueue.maxCapacityByNodeLabels,
- newlyParsedLeafQueue.reservationsContinueLooking);
+ setupQueueConfigs(clusterResource);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
@@ -1022,7 +956,8 @@ public class LeafQueue extends AbstractCSQueue {
application.getCurrentReservation()),
labelManager.getResourceByLabel(label, clusterResource));
- if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) {
+ if (potentialNewWithoutReservedCapacity <= queueCapacities
+ .getAbsoluteMaximumCapacity()) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: "
+ getQueueName()
@@ -1036,8 +971,9 @@ public class LeafQueue extends AbstractCSQueue {
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(), clusterResource) + " required " + required
+ " potentialNewWithoutReservedCapacity: "
- + potentialNewWithoutReservedCapacity + " ( " + " max-capacity: "
- + absoluteMaxCapacity + ")");
+ + potentialNewWithoutReservedCapacity + " ( "
+ + " max-capacity: "
+ + queueCapacities.getAbsoluteMaximumCapacity() + ")");
}
// we could potentially use this node instead of reserved node
return true;
@@ -1046,7 +982,8 @@ public class LeafQueue extends AbstractCSQueue {
// Otherwise, if any of the label of this node beyond queue limit, we
// cannot allocate on this node. Consider a small epsilon here.
- if (potentialNewCapacity > getAbsoluteMaximumCapacityByNodeLabel(label) + 1e-4) {
+ if (potentialNewCapacity > queueCapacities
+ .getAbsoluteMaximumCapacity(label) + 1e-4) {
canAssign = false;
break;
}
@@ -1061,7 +998,8 @@ public class LeafQueue extends AbstractCSQueue {
queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource))
+ " potentialNewCapacity: " + potentialNewCapacity + " ( "
- + " max-capacity: " + absoluteMaxCapacity + ")");
+ + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
+ + ")");
}
}
@@ -1144,7 +1082,7 @@ public class LeafQueue extends AbstractCSQueue {
Resources.multiplyAndNormalizeUp(resourceCalculator,
labelManager.getResourceByLabel(firstLabel,
clusterResource),
- getAbsoluteCapacityByNodeLabel(firstLabel),
+ queueCapacities.getAbsoluteCapacity(firstLabel),
minimumAllocation));
} else {
// else there's no label on request, just to use absolute capacity as
@@ -1152,7 +1090,7 @@ public class LeafQueue extends AbstractCSQueue {
queueCapacity =
Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
.getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource),
- absoluteCapacity, minimumAllocation);
+ queueCapacities.getAbsoluteCapacity(), minimumAllocation);
}
// Allow progress for queues with miniscule capacity
@@ -1797,12 +1735,9 @@ public class LeafQueue extends AbstractCSQueue {
}
private void updateAbsoluteCapacityResource(Resource clusterResource) {
-
- absoluteCapacityResource = Resources.multiplyAndNormalizeUp(
- resourceCalculator,
- clusterResource,
- absoluteCapacity, minimumAllocation);
-
+ absoluteCapacityResource =
+ Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
+ queueCapacities.getAbsoluteCapacity(), minimumAllocation);
}
@Override
@@ -1813,7 +1748,8 @@ public class LeafQueue extends AbstractCSQueue {
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
- updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
+ updateHeadroomInfo(clusterResource,
+ queueCapacities.getAbsoluteMaximumCapacity());
// Update metrics
CSQueueUtils.updateQueueStatistics(
@@ -1986,35 +1922,13 @@ public class LeafQueue extends AbstractCSQueue {
getParent().detachContainer(clusterResource, application, rmContainer);
}
}
-
- @Override
- public float getAbsActualCapacity() {
- //? Is this actually used by anything at present?
- // There is a findbugs warning -re lastClusterResource (now excluded),
- // when this is used, verify that the access is mt correct and remove
- // the findbugs exclusion if possible
- if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
- lastClusterResource, Resources.none())) {
- return absoluteCapacity;
- }
-
- Resource resourceRespectLabels =
- labelManager == null ? lastClusterResource : labelManager
- .getQueueResource(queueName, accessibleLabels, lastClusterResource);
- float absActualCapacity =
- Resources.divide(resourceCalculator, lastClusterResource,
- resourceRespectLabels, lastClusterResource);
-
- return absActualCapacity > absoluteCapacity ? absoluteCapacity
- : absActualCapacity;
- }
public void setCapacity(float capacity) {
- this.capacity = capacity;
+ queueCapacities.setCapacity(capacity);
}
public void setAbsoluteCapacity(float absoluteCapacity) {
- this.absoluteCapacity = absoluteCapacity;
+ queueCapacities.setAbsoluteCapacity(absoluteCapacity);
}
public void setMaxApplications(int maxApplications) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index de92c9c9ab9..5f7db782986 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -86,7 +86,7 @@ public class ParentQueue extends AbstractCSQueue {
this.rootQueue = (parent == null);
- float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
+ float rawCapacity = cs.getConfiguration().getNonLabeledQueueCapacity(getQueuePath());
if (rootQueue &&
(rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
@@ -94,46 +94,20 @@ public class ParentQueue extends AbstractCSQueue {
"capacity of " + rawCapacity + " for queue " + queueName +
". Must be " + CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE);
}
-
- float capacity = (float) rawCapacity / 100;
- float parentAbsoluteCapacity =
- (rootQueue) ? 1.0f : parent.getAbsoluteCapacity();
- float absoluteCapacity = parentAbsoluteCapacity * capacity;
-
- float maximumCapacity =
- (float) cs.getConfiguration().getMaximumCapacity(getQueuePath()) / 100;
- float absoluteMaxCapacity =
- CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
-
- QueueState state = cs.getConfiguration().getState(getQueuePath());
-
- Map acls =
- cs.getConfiguration().getAcls(getQueuePath());
-
- setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
- defaultLabelExpression, capacitiyByNodeLabels, maxCapacityByNodeLabels,
- cs.getConfiguration().getReservationContinueLook());
this.childQueues = new TreeSet(queueComparator);
+
+ setupQueueConfigs(cs.getClusterResource());
LOG.info("Initialized parent-queue " + queueName +
" name=" + queueName +
", fullname=" + getQueuePath());
}
- synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
- float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
- QueueState state, Map acls,
- Set accessibleLabels, String defaultLabelExpression,
- Map nodeLabelCapacities,
- Map maximumCapacitiesByLabel,
- boolean reservationContinueLooking) throws IOException {
- super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
- defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel,
- reservationContinueLooking);
- StringBuilder aclsString = new StringBuilder();
+ synchronized void setupQueueConfigs(Resource clusterResource)
+ throws IOException {
+ super.setupQueueConfigs(clusterResource);
+ StringBuilder aclsString = new StringBuilder();
for (Map.Entry e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
@@ -147,10 +121,10 @@ public class ParentQueue extends AbstractCSQueue {
}
LOG.info(queueName +
- ", capacity=" + capacity +
- ", asboluteCapacity=" + absoluteCapacity +
- ", maxCapacity=" + maximumCapacity +
- ", asboluteMaxCapacity=" + absoluteMaxCapacity +
+ ", capacity=" + this.queueCapacities.getCapacity() +
+ ", asboluteCapacity=" + this.queueCapacities.getAbsoluteCapacity() +
+ ", maxCapacity=" + this.queueCapacities.getMaximumCapacity() +
+ ", asboluteMaxCapacity=" + this.queueCapacities.getAbsoluteMaximumCapacity() +
", state=" + state +
", acls=" + aclsString +
", labels=" + labelStrBuilder.toString() + "\n" +
@@ -166,19 +140,19 @@ public class ParentQueue extends AbstractCSQueue {
}
float delta = Math.abs(1.0f - childCapacities); // crude way to check
// allow capacities being set to 0, and enforce child 0 if parent is 0
- if (((capacity > 0) && (delta > PRECISION)) ||
- ((capacity == 0) && (childCapacities > 0))) {
+ if (((queueCapacities.getCapacity() > 0) && (delta > PRECISION)) ||
+ ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
throw new IllegalArgumentException("Illegal" +
" capacity of " + childCapacities +
" for children of queue " + queueName);
}
// check label capacities
for (String nodeLabel : labelManager.getClusterNodeLabels()) {
- float capacityByLabel = getCapacityByNodeLabel(nodeLabel);
+ float capacityByLabel = queueCapacities.getCapacity(nodeLabel);
// check children's labels
float sum = 0;
for (CSQueue queue : childQueues) {
- sum += queue.getCapacityByNodeLabel(nodeLabel);
+ sum += queue.getQueueCapacities().getCapacity(nodeLabel);
}
if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
|| (capacityByLabel == 0) && (sum > 0)) {
@@ -254,8 +228,8 @@ public class ParentQueue extends AbstractCSQueue {
public String toString() {
return queueName + ": " +
"numChildQueue= " + childQueues.size() + ", " +
- "capacity=" + capacity + ", " +
- "absoluteCapacity=" + absoluteCapacity + ", " +
+ "capacity=" + queueCapacities.getCapacity() + ", " +
+ "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " +
"usedResources=" + queueUsage.getUsed() +
"usedCapacity=" + getUsedCapacity() + ", " +
"numApps=" + getNumApplications() + ", " +
@@ -263,9 +237,8 @@ public class ParentQueue extends AbstractCSQueue {
}
@Override
- public synchronized void reinitialize(
- CSQueue newlyParsedQueue, Resource clusterResource)
- throws IOException {
+ public synchronized void reinitialize(CSQueue newlyParsedQueue,
+ Resource clusterResource) throws IOException {
// Sanity check
if (!(newlyParsedQueue instanceof ParentQueue) ||
!newlyParsedQueue.getQueuePath().equals(getQueuePath())) {
@@ -276,18 +249,7 @@ public class ParentQueue extends AbstractCSQueue {
ParentQueue newlyParsedParentQueue = (ParentQueue)newlyParsedQueue;
// Set new configs
- setupQueueConfigs(clusterResource,
- newlyParsedParentQueue.capacity,
- newlyParsedParentQueue.absoluteCapacity,
- newlyParsedParentQueue.maximumCapacity,
- newlyParsedParentQueue.absoluteMaxCapacity,
- newlyParsedParentQueue.state,
- newlyParsedParentQueue.acls,
- newlyParsedParentQueue.accessibleLabels,
- newlyParsedParentQueue.defaultLabelExpression,
- newlyParsedParentQueue.capacitiyByNodeLabels,
- newlyParsedParentQueue.maxCapacityByNodeLabels,
- newlyParsedParentQueue.reservationsContinueLooking);
+ setupQueueConfigs(clusterResource);
// Re-configure existing child queues and add new ones
// The CS has already checked to ensure all existing child queues are present!
@@ -512,7 +474,7 @@ public class ParentQueue extends AbstractCSQueue {
labelManager.getResourceByLabel(label, clusterResource));
// if any of the label doesn't beyond limit, we can allocate on this node
if (currentAbsoluteLabelUsedCapacity >=
- getAbsoluteMaximumCapacityByNodeLabel(label)) {
+ queueCapacities.getAbsoluteMaximumCapacity(label)) {
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
+ " current-capacity (" + queueUsage.getUsed(label) + ") "
@@ -540,7 +502,8 @@ public class ParentQueue extends AbstractCSQueue {
Resources.subtract(queueUsage.getUsed(), reservedResources),
clusterResource);
- if (capacityWithoutReservedCapacity <= absoluteMaxCapacity) {
+ if (capacityWithoutReservedCapacity <= queueCapacities
+ .getAbsoluteMaximumCapacity()) {
if (LOG.isDebugEnabled()) {
LOG.debug("parent: try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed().getMemory()
@@ -550,7 +513,7 @@ public class ParentQueue extends AbstractCSQueue {
/ clusterResource.getMemory()
+ " potentialNewWithoutReservedCapacity: "
+ capacityWithoutReservedCapacity + " ( " + " max-capacity: "
- + absoluteMaxCapacity + ")");
+ + queueCapacities.getAbsoluteMaximumCapacity() + ")");
}
// we could potentially use this node instead of reserved node
return true;
@@ -760,13 +723,6 @@ public class ParentQueue extends AbstractCSQueue {
}
}
}
-
- @Override
- public float getAbsActualCapacity() {
- // for now, simply return actual capacity = guaranteed capacity for parent
- // queue
- return absoluteCapacity;
- }
public synchronized int getNumApplications() {
return numApplications;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index 0725959ccaa..f91a6d84cd8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -19,8 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -101,16 +99,7 @@ public class PlanQueue extends ParentQueue {
}
// Set new configs
- setupQueueConfigs(clusterResource, newlyParsedParentQueue.getCapacity(),
- newlyParsedParentQueue.getAbsoluteCapacity(),
- newlyParsedParentQueue.getMaximumCapacity(),
- newlyParsedParentQueue.getAbsoluteMaximumCapacity(),
- newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs(),
- newlyParsedParentQueue.accessibleLabels,
- newlyParsedParentQueue.defaultLabelExpression,
- newlyParsedParentQueue.capacitiyByNodeLabels,
- newlyParsedParentQueue.maxCapacityByNodeLabels,
- newlyParsedParentQueue.getReservationContinueLooking());
+ setupQueueConfigs(clusterResource);
updateQuotas(newlyParsedParentQueue.userLimit,
newlyParsedParentQueue.userLimitFactor,
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
index a0e6d8c9b17..962a636eb02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueCapacities.java
@@ -19,12 +19,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+
+import com.google.common.collect.Sets;
public class QueueCapacities {
private static final String NL = CommonNodeLabelsManager.NO_LABEL;
@@ -32,13 +38,15 @@ public class QueueCapacities {
private Map capacitiesMap;
private ReadLock readLock;
private WriteLock writeLock;
+ private final boolean isRoot;
- public QueueCapacities() {
+ public QueueCapacities(boolean isRoot) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
readLock = lock.readLock();
writeLock = lock.writeLock();
capacitiesMap = new HashMap();
+ this.isRoot = isRoot;
}
// Usage enum here to make implement cleaner
@@ -58,6 +66,18 @@ public class QueueCapacities {
public Capacities() {
capacitiesArr = new float[CapacityType.values().length];
}
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{used=" + capacitiesArr[0] + "%, ");
+ sb.append("abs_used=" + capacitiesArr[1] + "%, ");
+ sb.append("max_cap=" + capacitiesArr[2] + "%, ");
+ sb.append("abs_max_cap=" + capacitiesArr[3] + "%, ");
+ sb.append("cap=" + capacitiesArr[4] + "%, ");
+ sb.append("abs_cap=" + capacitiesArr[5] + "%}");
+ return sb.toString();
+ }
}
private float _get(String label, CapacityType type) {
@@ -127,6 +147,10 @@ public class QueueCapacities {
}
public float getCapacity(String label) {
+ if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL) && isRoot) {
+ return 1f;
+ }
+
return _get(label, CapacityType.CAP);
}
@@ -144,6 +168,9 @@ public class QueueCapacities {
}
public float getAbsoluteCapacity(String label) {
+ if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL) && isRoot) {
+ return 1f;
+ }
return _get(label, CapacityType.ABS_CAP);
}
@@ -188,4 +215,43 @@ public class QueueCapacities {
public void setAbsoluteMaximumCapacity(String label, float value) {
_set(label, CapacityType.ABS_MAX_CAP, value);
}
+
+ /**
+ * Clear configurable fields, like
+ * (absolute)capacity/(absolute)maximum-capacity, this will be used by queue
+ * reinitialize, when we reinitialize a queue, we will first clear all
+ * configurable fields, and load new values
+ */
+ public void clearConfigurableFields() {
+ try {
+ writeLock.lock();
+ for (String label : capacitiesMap.keySet()) {
+ _set(label, CapacityType.CAP, 0);
+ _set(label, CapacityType.MAX_CAP, 0);
+ _set(label, CapacityType.ABS_CAP, 0);
+ _set(label, CapacityType.ABS_MAX_CAP, 0);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public Set getExistingNodeLabels() {
+ try {
+ readLock.lock();
+ return new HashSet(capacitiesMap.keySet());
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public String toString() {
+ try {
+ readLock.lock();
+ return this.capacitiesMap.toString();
+ } finally {
+ readLock.unlock();
+ }
+ }
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index c4424b57df4..a8d17cfd0d1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -62,11 +62,11 @@ public class ReservationQueue extends LeafQueue {
throw new IOException("Trying to reinitialize " + getQueuePath()
+ " from " + newlyParsedQueue.getQueuePath());
}
+ super.reinitialize(newlyParsedQueue, clusterResource);
CSQueueUtils.updateQueueStatistics(
parent.schedulerContext.getResourceCalculator(), newlyParsedQueue,
parent, parent.schedulerContext.getClusterResource(),
parent.schedulerContext.getMinimumResourceCapability());
- super.reinitialize(newlyParsedQueue, clusterResource);
updateQuotas(parent.getUserLimitForReservation(),
parent.getUserLimitFactor(),
parent.getMaxApplicationsForReservations(),
@@ -108,9 +108,9 @@ public class ReservationQueue extends LeafQueue {
maxApplicationsPerUser = maxAppsPerUserForReservation;
}
- // used by the super constructor, we initialize to zero
- protected float getCapacityFromConf() {
- return 0f;
+ @Override
+ protected void setupConfigurableCapacities() {
+ CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(),
+ queueCapacities, parent == null ? null : parent.getQueueCapacities());
}
-
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
index d643c9dc0ef..5135ba9be9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSQueueUtils.java
@@ -202,33 +202,33 @@ public class TestCSQueueUtils {
LOG.info("t2 l2q2 " + result);
//some usage, but below the base capacity
- root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
- l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
+ root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
+ l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.4f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//usage gt base on parent sibling
- root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
- l1q2.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
+ root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
+ l1q2.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.3f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//same as last, but with usage also on direct parent
- root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
- l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
+ root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
+ l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.1f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//add to direct sibling, below the threshold of effect at present
- root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
- l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
- l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
+ root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
+ l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
+ l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f);
@@ -236,9 +236,9 @@ public class TestCSQueueUtils {
//add to direct sibling, now above the threshold of effect
//(it's cumulative with prior tests)
- root.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
- l1q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
- l2q1.getResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
+ root.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
+ l1q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
+ l2q1.getQueueResourceUsage().incUsed(Resources.multiply(clusterResource, 0.2f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.1f, result, 0.000001f);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index edcd871c880..456178d4358 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -389,11 +389,11 @@ public class TestCapacityScheduler {
public void testMaximumCapacitySetup() {
float delta = 0.0000001f;
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
- assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getMaximumCapacity(A),delta);
+ assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getNonLabeledQueueMaximumCapacity(A),delta);
conf.setMaximumCapacity(A, 50.0f);
- assertEquals(50.0f, conf.getMaximumCapacity(A),delta);
+ assertEquals(50.0f, conf.getNonLabeledQueueMaximumCapacity(A),delta);
conf.setMaximumCapacity(A, -1);
- assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getMaximumCapacity(A),delta);
+ assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getNonLabeledQueueMaximumCapacity(A),delta);
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index 261fa013073..4853241ec6e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -91,7 +91,8 @@ public class TestCapacitySchedulerNodeLabelUpdate {
String label) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = scheduler.getQueue(queueName);
- Assert.assertEquals(memory, queue.getUsedResourceByLabel(label).getMemory());
+ Assert.assertEquals(memory, queue.getQueueResourceUsage().getUsed(label)
+ .getMemory());
}
@Test (timeout = 30000)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java
index 89a531162c6..a468230dd10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueCapacities.java
@@ -99,7 +99,7 @@ public class TestQueueCapacities {
}
private void internalTestModifyAndRead(String label) throws Exception {
- QueueCapacities qc = new QueueCapacities();
+ QueueCapacities qc = new QueueCapacities(false);
// First get returns 0 always
Assert.assertEquals(0f, get(qc, suffix, label), 1e-8);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
index cf2e5cedf8f..fc03581e54a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueParsing.java
@@ -418,18 +418,18 @@ public class TestQueueParsing {
// check capacity of A2
CSQueue qA2 = capacityScheduler.getQueue("a2");
Assert.assertEquals(0.7, qA2.getCapacity(), DELTA);
- Assert.assertEquals(0.5, qA2.getCapacityByNodeLabel("red"), DELTA);
+ Assert.assertEquals(0.5, qA2.getQueueCapacities().getCapacity("red"), DELTA);
Assert.assertEquals(0.07, qA2.getAbsoluteCapacity(), DELTA);
- Assert.assertEquals(0.25, qA2.getAbsoluteCapacityByNodeLabel("red"), DELTA);
+ Assert.assertEquals(0.25, qA2.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.1275, qA2.getAbsoluteMaximumCapacity(), DELTA);
- Assert.assertEquals(0.3, qA2.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA);
+ Assert.assertEquals(0.3, qA2.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
// check capacity of B3
CSQueue qB3 = capacityScheduler.getQueue("b3");
Assert.assertEquals(0.18, qB3.getAbsoluteCapacity(), DELTA);
- Assert.assertEquals(0.125, qB3.getAbsoluteCapacityByNodeLabel("red"), DELTA);
+ Assert.assertEquals(0.125, qB3.getQueueCapacities().getAbsoluteCapacity("red"), DELTA);
Assert.assertEquals(0.35, qB3.getAbsoluteMaximumCapacity(), DELTA);
- Assert.assertEquals(1, qB3.getAbsoluteMaximumCapacityByNodeLabel("red"), DELTA);
+ Assert.assertEquals(1, qB3.getQueueCapacities().getAbsoluteMaximumCapacity("red"), DELTA);
}
private void
@@ -641,8 +641,8 @@ public class TestQueueParsing {
// check root queue's capacity by label -- they should be all zero
CSQueue root = capacityScheduler.getQueue(CapacitySchedulerConfiguration.ROOT);
- Assert.assertEquals(0, root.getCapacityByNodeLabel("red"), DELTA);
- Assert.assertEquals(0, root.getCapacityByNodeLabel("blue"), DELTA);
+ Assert.assertEquals(0, root.getQueueCapacities().getCapacity("red"), DELTA);
+ Assert.assertEquals(0, root.getQueueCapacities().getCapacity("blue"), DELTA);
CSQueue a = capacityScheduler.getQueue("a");
Assert.assertEquals(0.10, a.getAbsoluteCapacity(), DELTA);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
index d418dab34a6..63a48bb00e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
@@ -395,9 +395,9 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
String qshortName = qArr[qArr.length - 1];
assertEquals("usedCapacity doesn't match", 0, info.usedCapacity, 1e-3f);
- assertEquals("capacity doesn't match", csConf.getCapacity(q),
+ assertEquals("capacity doesn't match", csConf.getNonLabeledQueueCapacity(q),
info.capacity, 1e-3f);
- float expectCapacity = csConf.getMaximumCapacity(q);
+ float expectCapacity = csConf.getNonLabeledQueueMaximumCapacity(q);
float expectAbsMaxCapacity = parentAbsMaxCapacity * (info.maxCapacity/100);
if (CapacitySchedulerConfiguration.UNDEFINED == expectCapacity) {
expectCapacity = 100;