YARN-10486. FS-CS converter: handle case when weight=0 and allow more lenient capacity checks in Capacity Scheduler. Contributed by Peter Bacsko
This commit is contained in:
parent
ce7827c82a
commit
5ff70a59c4
|
@ -387,6 +387,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
|
|
||||||
public static final int DEFAULT_MAX_PARALLEL_APPLICATIONS = Integer.MAX_VALUE;
|
public static final int DEFAULT_MAX_PARALLEL_APPLICATIONS = Integer.MAX_VALUE;
|
||||||
|
|
||||||
|
public static final String ALLOW_ZERO_CAPACITY_SUM =
|
||||||
|
"allow-zero-capacity-sum";
|
||||||
|
|
||||||
|
public static final boolean DEFAULT_ALLOW_ZERO_CAPACITY_SUM = false;
|
||||||
/**
|
/**
|
||||||
* Different resource types supported.
|
* Different resource types supported.
|
||||||
*/
|
*/
|
||||||
|
@ -1488,6 +1492,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
: defaultMaxParallelAppsForUser;
|
: defaultMaxParallelAppsForUser;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean getAllowZeroCapacitySum(String queue) {
|
||||||
|
return getBoolean(getQueuePrefix(queue)
|
||||||
|
+ ALLOW_ZERO_CAPACITY_SUM, DEFAULT_ALLOW_ZERO_CAPACITY_SUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setAllowZeroCapacitySum(String queue, boolean value) {
|
||||||
|
setBoolean(getQueuePrefix(queue)
|
||||||
|
+ ALLOW_ZERO_CAPACITY_SUM, value);
|
||||||
|
}
|
||||||
private static final String PREEMPTION_CONFIG_PREFIX =
|
private static final String PREEMPTION_CONFIG_PREFIX =
|
||||||
"yarn.resourcemanager.monitor.capacity.preemption.";
|
"yarn.resourcemanager.monitor.capacity.preemption.";
|
||||||
|
|
||||||
|
|
|
@ -95,6 +95,8 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
|
|
||||||
private int runnableApps;
|
private int runnableApps;
|
||||||
|
|
||||||
|
private final boolean allowZeroCapacitySum;
|
||||||
|
|
||||||
public ParentQueue(CapacitySchedulerContext cs,
|
public ParentQueue(CapacitySchedulerContext cs,
|
||||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||||
super(cs, queueName, parent, old);
|
super(cs, queueName, parent, old);
|
||||||
|
@ -111,6 +113,8 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
this.childQueues = new ArrayList<>();
|
this.childQueues = new ArrayList<>();
|
||||||
|
this.allowZeroCapacitySum =
|
||||||
|
cs.getConfiguration().getAllowZeroCapacitySum(getQueuePath());
|
||||||
|
|
||||||
setupQueueConfigs(cs.getClusterResource());
|
setupQueueConfigs(cs.getClusterResource());
|
||||||
|
|
||||||
|
@ -159,7 +163,8 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
+ aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
|
+ aclsString + ", labels=" + labelStrBuilder.toString() + "\n"
|
||||||
+ ", reservationsContinueLooking=" + reservationsContinueLooking
|
+ ", reservationsContinueLooking=" + reservationsContinueLooking
|
||||||
+ ", orderingPolicy=" + getQueueOrderingPolicyConfigName()
|
+ ", orderingPolicy=" + getQueueOrderingPolicyConfigName()
|
||||||
+ ", priority=" + priority);
|
+ ", priority=" + priority
|
||||||
|
+ ", allowZeroCapacitySum=" + allowZeroCapacitySum);
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -192,13 +197,31 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
float delta = Math.abs(1.0f - childCapacities); // crude way to check
|
float delta = Math.abs(1.0f - childCapacities); // crude way to check
|
||||||
// allow capacities being set to 0, and enforce child 0 if parent is 0
|
|
||||||
if ((minResDefaultLabel.equals(Resources.none())
|
if (allowZeroCapacitySum) {
|
||||||
&& (queueCapacities.getCapacity() > 0) && (delta > PRECISION))
|
// If we allow zero capacity for children, only fail if:
|
||||||
|| ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
|
// Σ(childCapacities) != 1.0f OR Σ(childCapacities) != 0.0f
|
||||||
|
//
|
||||||
|
// Therefore, child queues either add up to 0% or 100%.
|
||||||
|
//
|
||||||
|
// Current capacity doesn't matter, because we apply this logic
|
||||||
|
// regardless of whether the current capacity is zero or not.
|
||||||
|
if (minResDefaultLabel.equals(Resources.none())
|
||||||
|
&& (delta > PRECISION && childCapacities > PRECISION)) {
|
||||||
|
LOG.error("Capacity validation check is relaxed for"
|
||||||
|
+ " queue {}, but the capacity must be either 0% or 100%",
|
||||||
|
getQueuePath());
|
||||||
throw new IllegalArgumentException("Illegal" + " capacity of "
|
throw new IllegalArgumentException("Illegal" + " capacity of "
|
||||||
+ childCapacities + " for children of queue " + queueName);
|
+ childCapacities + " for children of queue " + queueName);
|
||||||
}
|
}
|
||||||
|
} else if ((minResDefaultLabel.equals(Resources.none())
|
||||||
|
&& (queueCapacities.getCapacity() > 0) && (delta > PRECISION))
|
||||||
|
|| ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
|
||||||
|
// allow capacities being set to 0, and enforce child 0 if parent is 0
|
||||||
|
throw new IllegalArgumentException("Illegal" + " capacity of "
|
||||||
|
+ childCapacities + " for children of queue " + queueName);
|
||||||
|
}
|
||||||
|
|
||||||
// check label capacities
|
// check label capacities
|
||||||
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
|
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
|
||||||
float capacityByLabel = queueCapacities.getCapacity(nodeLabel);
|
float capacityByLabel = queueCapacities.getCapacity(nodeLabel);
|
||||||
|
@ -226,7 +249,24 @@ public class ParentQueue extends AbstractCSQueue {
|
||||||
Resources.addTo(minRes, queue.getQueueResourceQuotas()
|
Resources.addTo(minRes, queue.getQueueResourceQuotas()
|
||||||
.getConfiguredMinResource(nodeLabel));
|
.getConfiguredMinResource(nodeLabel));
|
||||||
}
|
}
|
||||||
if ((minResDefaultLabel.equals(Resources.none()) && capacityByLabel > 0
|
|
||||||
|
float labelDelta = Math.abs(1.0f - sum);
|
||||||
|
|
||||||
|
if (allowZeroCapacitySum) {
|
||||||
|
// Similar to above, we only throw exception if
|
||||||
|
// Σ(childCapacities) != 1.0f OR Σ(childCapacities) != 0.0f
|
||||||
|
if (minResDefaultLabel.equals(Resources.none())
|
||||||
|
&& capacityByLabel > 0
|
||||||
|
&& (labelDelta > PRECISION && sum > PRECISION)) {
|
||||||
|
LOG.error("Capacity validation check is relaxed for"
|
||||||
|
+ " queue {}, but the capacity must be either 0% or 100%",
|
||||||
|
getQueuePath());
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Illegal" + " capacity of " + sum + " for children of queue "
|
||||||
|
+ queueName + " for label=" + nodeLabel);
|
||||||
|
}
|
||||||
|
} else if ((minResDefaultLabel.equals(Resources.none())
|
||||||
|
&& capacityByLabel > 0
|
||||||
&& Math.abs(1.0f - sum) > PRECISION)
|
&& Math.abs(1.0f - sum) > PRECISION)
|
||||||
|| (capacityByLabel == 0) && (sum > 0)) {
|
|| (capacityByLabel == 0) && (sum > 0)) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
|
|
|
@ -19,12 +19,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.math.RoundingMode;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
|
||||||
|
@ -271,10 +270,22 @@ public class FSQueueConverter {
|
||||||
List<FSQueue> children = queue.getChildQueues();
|
List<FSQueue> children = queue.getChildQueues();
|
||||||
|
|
||||||
int totalWeight = getTotalWeight(children);
|
int totalWeight = getTotalWeight(children);
|
||||||
Map<String, BigDecimal> capacities = getCapacities(totalWeight, children);
|
Pair<Map<String, BigDecimal>, Boolean> result =
|
||||||
|
WeightToCapacityConversionUtil.getCapacities(
|
||||||
|
totalWeight, children, ruleHandler);
|
||||||
|
|
||||||
|
Map<String, BigDecimal> capacities = result.getLeft();
|
||||||
|
boolean shouldAllowZeroSumCapacity = result.getRight();
|
||||||
|
|
||||||
capacities
|
capacities
|
||||||
.forEach((key, value) -> capacitySchedulerConfig.set(PREFIX + key +
|
.forEach((key, value) -> capacitySchedulerConfig.set(PREFIX + key +
|
||||||
".capacity", value.toString()));
|
".capacity", value.toString()));
|
||||||
|
|
||||||
|
if (shouldAllowZeroSumCapacity) {
|
||||||
|
String queueName = queue.getName();
|
||||||
|
capacitySchedulerConfig.setBoolean(
|
||||||
|
PREFIX + queueName + ".allow-zero-capacity-sum", true);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -294,60 +305,6 @@ public class FSQueueConverter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, BigDecimal> getCapacities(int totalWeight,
|
|
||||||
List<FSQueue> children) {
|
|
||||||
final BigDecimal hundred = new BigDecimal(100).setScale(3);
|
|
||||||
|
|
||||||
if (children.size() == 0) {
|
|
||||||
return new HashMap<>();
|
|
||||||
} else if (children.size() == 1) {
|
|
||||||
Map<String, BigDecimal> capacity = new HashMap<>();
|
|
||||||
String queueName = children.get(0).getName();
|
|
||||||
capacity.put(queueName, hundred);
|
|
||||||
|
|
||||||
return capacity;
|
|
||||||
} else {
|
|
||||||
Map<String, BigDecimal> capacities = new HashMap<>();
|
|
||||||
|
|
||||||
children
|
|
||||||
.stream()
|
|
||||||
.forEach(queue -> {
|
|
||||||
BigDecimal total = new BigDecimal(totalWeight);
|
|
||||||
BigDecimal weight = new BigDecimal(queue.getWeight());
|
|
||||||
BigDecimal pct = weight
|
|
||||||
.setScale(5)
|
|
||||||
.divide(total, RoundingMode.HALF_UP)
|
|
||||||
.multiply(hundred)
|
|
||||||
.setScale(3);
|
|
||||||
|
|
||||||
if (Resources.none().compareTo(queue.getMinShare()) != 0) {
|
|
||||||
ruleHandler.handleMinResources();
|
|
||||||
}
|
|
||||||
|
|
||||||
capacities.put(queue.getName(), pct);
|
|
||||||
});
|
|
||||||
|
|
||||||
BigDecimal totalPct = new BigDecimal(0);
|
|
||||||
for (Map.Entry<String, BigDecimal> entry : capacities.entrySet()) {
|
|
||||||
totalPct = totalPct.add(entry.getValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
// fix last value if total != 100.000
|
|
||||||
if (!totalPct.equals(hundred)) {
|
|
||||||
BigDecimal tmp = new BigDecimal(0);
|
|
||||||
for (int i = 0; i < children.size() - 1; i++) {
|
|
||||||
tmp = tmp.add(capacities.get(children.get(i).getQueueName()));
|
|
||||||
}
|
|
||||||
|
|
||||||
String lastQueue = children.get(children.size() - 1).getName();
|
|
||||||
BigDecimal corrected = hundred.subtract(tmp);
|
|
||||||
capacities.put(lastQueue, corrected);
|
|
||||||
}
|
|
||||||
|
|
||||||
return capacities;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private int getTotalWeight(List<FSQueue> children) {
|
private int getTotalWeight(List<FSQueue> children) {
|
||||||
double sum = children
|
double sum = children
|
||||||
.stream()
|
.stream()
|
||||||
|
|
|
@ -0,0 +1,144 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
import java.math.RoundingMode;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class that converts Fair Scheduler weights to capacities in
|
||||||
|
* percentages.
|
||||||
|
*
|
||||||
|
* It also makes sure that the sum of the capacities adds up to exactly 100.0.
|
||||||
|
*
|
||||||
|
* There is a special case when one or more queues have a capacity of 0. This
|
||||||
|
* can happen if the weight was originally 0 in the FS configuration. In
|
||||||
|
* this case, we need an extra queue with a capacity of 100.0 to have a valid
|
||||||
|
* CS configuration.
|
||||||
|
*/
|
||||||
|
final class WeightToCapacityConversionUtil {
|
||||||
|
private static final BigDecimal HUNDRED = new BigDecimal(100).setScale(3);
|
||||||
|
private static final BigDecimal ZERO = new BigDecimal(0).setScale(3);
|
||||||
|
|
||||||
|
private WeightToCapacityConversionUtil() {
|
||||||
|
// no instances
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static Pair<Map<String, BigDecimal>, Boolean> getCapacities(int totalWeight,
|
||||||
|
List<FSQueue> children, FSConfigToCSConfigRuleHandler ruleHandler) {
|
||||||
|
|
||||||
|
if (children.size() == 0) {
|
||||||
|
return Pair.of(new HashMap<>(), false);
|
||||||
|
} else if (children.size() == 1) {
|
||||||
|
Map<String, BigDecimal> capacity = new HashMap<>();
|
||||||
|
String queueName = children.get(0).getName();
|
||||||
|
capacity.put(queueName, HUNDRED);
|
||||||
|
|
||||||
|
return Pair.of(capacity, false);
|
||||||
|
} else {
|
||||||
|
Map<String, BigDecimal> capacities = new HashMap<>();
|
||||||
|
|
||||||
|
children
|
||||||
|
.stream()
|
||||||
|
.forEach(queue -> {
|
||||||
|
BigDecimal pct;
|
||||||
|
|
||||||
|
if (totalWeight == 0) {
|
||||||
|
pct = ZERO;
|
||||||
|
} else {
|
||||||
|
BigDecimal total = new BigDecimal(totalWeight);
|
||||||
|
BigDecimal weight = new BigDecimal(queue.getWeight());
|
||||||
|
pct = weight
|
||||||
|
.setScale(5)
|
||||||
|
.divide(total, RoundingMode.HALF_UP)
|
||||||
|
.multiply(HUNDRED)
|
||||||
|
.setScale(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (Resources.none().compareTo(queue.getMinShare()) != 0) {
|
||||||
|
ruleHandler.handleMinResources();
|
||||||
|
}
|
||||||
|
|
||||||
|
capacities.put(queue.getName(), pct);
|
||||||
|
});
|
||||||
|
|
||||||
|
BigDecimal totalPct = ZERO;
|
||||||
|
for (Map.Entry<String, BigDecimal> entry : capacities.entrySet()) {
|
||||||
|
totalPct = totalPct.add(entry.getValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
// fix capacities if total != 100.000
|
||||||
|
boolean shouldAllowZeroSumCapacity = false;
|
||||||
|
if (!totalPct.equals(HUNDRED)) {
|
||||||
|
shouldAllowZeroSumCapacity = fixCapacities(capacities, totalPct);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Pair.of(capacities, shouldAllowZeroSumCapacity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
static boolean fixCapacities(Map<String, BigDecimal> capacities,
|
||||||
|
BigDecimal totalPct) {
|
||||||
|
final BigDecimal hundred = new BigDecimal(100).setScale(3);
|
||||||
|
boolean shouldAllowZeroSumCapacity = false;
|
||||||
|
|
||||||
|
// Sort the list so we'll adjust the highest capacity value,
|
||||||
|
// because that will affected less by a small change.
|
||||||
|
// Also, it's legal to have weight = 0 and we have to avoid picking
|
||||||
|
// that value as well.
|
||||||
|
List<Map.Entry<String, BigDecimal>> sortedEntries = capacities
|
||||||
|
.entrySet()
|
||||||
|
.stream()
|
||||||
|
.sorted(new Comparator<Map.Entry<String, BigDecimal>>() {
|
||||||
|
@Override
|
||||||
|
public int compare(Map.Entry<String, BigDecimal> e1,
|
||||||
|
Map.Entry<String, BigDecimal> e2) {
|
||||||
|
return e2.getValue().compareTo(e1.getValue());
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect(Collectors.toList());
|
||||||
|
|
||||||
|
String highestCapacityQueue = sortedEntries.get(0).getKey();
|
||||||
|
BigDecimal highestCapacity = sortedEntries.get(0).getValue();
|
||||||
|
|
||||||
|
if (highestCapacity.equals(ZERO)) {
|
||||||
|
// need to set allow-zero-capacity-sum on this queue
|
||||||
|
// because we have zero weights on this level
|
||||||
|
shouldAllowZeroSumCapacity = true;
|
||||||
|
} else {
|
||||||
|
BigDecimal diff = hundred.subtract(totalPct);
|
||||||
|
BigDecimal correctedHighest = highestCapacity.add(diff);
|
||||||
|
capacities.put(highestCapacityQueue, correctedHighest);
|
||||||
|
}
|
||||||
|
|
||||||
|
return shouldAllowZeroSumCapacity;
|
||||||
|
}
|
||||||
|
}
|
|
@ -426,6 +426,7 @@ public class TestParentQueue {
|
||||||
private static final String B1 = "b1";
|
private static final String B1 = "b1";
|
||||||
private static final String B2 = "b2";
|
private static final String B2 = "b2";
|
||||||
private static final String B3 = "b3";
|
private static final String B3 = "b3";
|
||||||
|
private static final String B4 = "b4";
|
||||||
|
|
||||||
private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) {
|
private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) {
|
||||||
|
|
||||||
|
@ -677,6 +678,63 @@ public class TestParentQueue {
|
||||||
TestUtils.spyHook);
|
TestUtils.spyHook);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueCapacitySettingParentZeroChildren100pctZeroSumAllowed()
|
||||||
|
throws Exception {
|
||||||
|
// Setup queue configs
|
||||||
|
setupMultiLevelQueues(csConf);
|
||||||
|
|
||||||
|
// set parent capacity to 0 when child is 100
|
||||||
|
// and allow zero capacity sum
|
||||||
|
csConf.setCapacity(Q_B, 0);
|
||||||
|
csConf.setCapacity(Q_A, 60);
|
||||||
|
csConf.setAllowZeroCapacitySum(Q_B, true);
|
||||||
|
CSQueueStore queues = new CSQueueStore();
|
||||||
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
|
TestUtils.spyHook);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IllegalArgumentException.class)
|
||||||
|
public void testQueueCapacitySettingParentZeroChildren50pctZeroSumAllowed()
|
||||||
|
throws Exception {
|
||||||
|
// Setup queue configs
|
||||||
|
setupMultiLevelQueues(csConf);
|
||||||
|
|
||||||
|
// set parent capacity to 0 when sum(children) is 50
|
||||||
|
// and allow zero capacity sum
|
||||||
|
csConf.setCapacity(Q_B, 0);
|
||||||
|
csConf.setCapacity(Q_A, 100);
|
||||||
|
csConf.setCapacity(Q_B + "." + B1, 10);
|
||||||
|
csConf.setCapacity(Q_B + "." + B2, 20);
|
||||||
|
csConf.setCapacity(Q_B + "." + B3, 20);
|
||||||
|
csConf.setAllowZeroCapacitySum(Q_B, true);
|
||||||
|
CSQueueStore queues = new CSQueueStore();
|
||||||
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
|
TestUtils.spyHook);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testQueueCapacitySettingParentNonZeroChildrenZeroSumAllowed()
|
||||||
|
throws Exception {
|
||||||
|
// Setup queue configs
|
||||||
|
setupMultiLevelQueues(csConf);
|
||||||
|
|
||||||
|
// set parent capacity to 10 when sum(children) is 0
|
||||||
|
// and allow zero capacity sum
|
||||||
|
csConf.setCapacity(Q_B, 10);
|
||||||
|
csConf.setCapacity(Q_A, 50);
|
||||||
|
csConf.setCapacity(Q_B + "." + B1, 0);
|
||||||
|
csConf.setCapacity(Q_B + "." + B2, 0);
|
||||||
|
csConf.setCapacity(Q_B + "." + B3, 0);
|
||||||
|
csConf.setAllowZeroCapacitySum(Q_B, true);
|
||||||
|
CSQueueStore queues = new CSQueueStore();
|
||||||
|
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
|
||||||
|
CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
|
TestUtils.spyHook);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueCapacityZero() throws Exception {
|
public void testQueueCapacityZero() throws Exception {
|
||||||
// Setup queue configs
|
// Setup queue configs
|
||||||
|
|
|
@ -68,7 +68,10 @@ public class TestFSQueueConverter {
|
||||||
"root.admins.alice",
|
"root.admins.alice",
|
||||||
"root.admins.bob",
|
"root.admins.bob",
|
||||||
"root.users.joe",
|
"root.users.joe",
|
||||||
"root.users.john");
|
"root.users.john",
|
||||||
|
"root.misc",
|
||||||
|
"root.misc.a",
|
||||||
|
"root.misc.b");
|
||||||
|
|
||||||
private static final String FILE_PREFIX = "file:";
|
private static final String FILE_PREFIX = "file:";
|
||||||
private static final String FAIR_SCHEDULER_XML =
|
private static final String FAIR_SCHEDULER_XML =
|
||||||
|
@ -148,7 +151,7 @@ public class TestFSQueueConverter {
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
// root children
|
// root children
|
||||||
assertEquals("root children", "default,admins,users",
|
assertEquals("root children", "default,admins,users,misc",
|
||||||
csConfig.get(PREFIX + "root.queues"));
|
csConfig.get(PREFIX + "root.queues"));
|
||||||
|
|
||||||
// root.admins children
|
// root.admins children
|
||||||
|
@ -167,7 +170,8 @@ public class TestFSQueueConverter {
|
||||||
Sets.newHashSet("root",
|
Sets.newHashSet("root",
|
||||||
"root.default",
|
"root.default",
|
||||||
"root.admins",
|
"root.admins",
|
||||||
"root.users"));
|
"root.users",
|
||||||
|
"root.misc"));
|
||||||
|
|
||||||
assertNoValueForQueues(leafs, ".queues", csConfig);
|
assertNoValueForQueues(leafs, ".queues", csConfig);
|
||||||
}
|
}
|
||||||
|
@ -285,6 +289,29 @@ public class TestFSQueueConverter {
|
||||||
csConfig.get(PREFIX + "root.admins.alice.capacity"));
|
csConfig.get(PREFIX + "root.admins.alice.capacity"));
|
||||||
assertEquals("root.admins.bob capacity", "25.000",
|
assertEquals("root.admins.bob capacity", "25.000",
|
||||||
csConfig.get(PREFIX + "root.admins.bob.capacity"));
|
csConfig.get(PREFIX + "root.admins.bob.capacity"));
|
||||||
|
|
||||||
|
// root.misc
|
||||||
|
assertEquals("root.misc capacity", "0.000",
|
||||||
|
csConfig.get(PREFIX + "root.misc.capacity"));
|
||||||
|
assertEquals("root.misc.a capacity", "0.000",
|
||||||
|
csConfig.get(PREFIX + "root.misc.a.capacity"));
|
||||||
|
assertEquals("root.misc.b capacity", "0.000",
|
||||||
|
csConfig.get(PREFIX + "root.misc.b.capacity"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testZeroSumCapacityValidation() {
|
||||||
|
converter = builder.build();
|
||||||
|
|
||||||
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
|
Set<String> noZeroSumAllowedQueues = Sets.difference(ALL_QUEUES,
|
||||||
|
Sets.newHashSet("root.misc"));
|
||||||
|
assertNoValueForQueues(noZeroSumAllowedQueues, ".allow-zero-capacity-sum",
|
||||||
|
csConfig);
|
||||||
|
|
||||||
|
assertTrue("root.misc allow zero capacities", csConfig.getBoolean(
|
||||||
|
PREFIX + "root.misc.allow-zero-capacity-sum", false));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -0,0 +1,194 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.junit.MockitoJUnitRunner;
|
||||||
|
|
||||||
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
public class TestWeightToCapacityConversionUtil {
|
||||||
|
@Mock
|
||||||
|
private FSConfigToCSConfigRuleHandler ruleHandler;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSingleWeightConversion() {
|
||||||
|
List<FSQueue> queues = createFSQueues(1);
|
||||||
|
Pair<Map<String, BigDecimal>, Boolean> conversion =
|
||||||
|
WeightToCapacityConversionUtil.getCapacities(1, queues, ruleHandler);
|
||||||
|
|
||||||
|
assertFalse("Capacity zerosum allowed", conversion.getRight());
|
||||||
|
assertEquals("Capacity", new BigDecimal("100.000"),
|
||||||
|
conversion.getLeft().get("root.a"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoChildQueueConversion() {
|
||||||
|
List<FSQueue> queues = new ArrayList<>();
|
||||||
|
Pair<Map<String, BigDecimal>, Boolean> conversion =
|
||||||
|
WeightToCapacityConversionUtil.getCapacities(1, queues, ruleHandler);
|
||||||
|
|
||||||
|
assertEquals("Converted items", 0, conversion.getLeft().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiWeightConversion() {
|
||||||
|
List<FSQueue> queues = createFSQueues(1, 2, 3);
|
||||||
|
|
||||||
|
Pair<Map<String, BigDecimal>, Boolean> conversion =
|
||||||
|
WeightToCapacityConversionUtil.getCapacities(6, queues, ruleHandler);
|
||||||
|
|
||||||
|
Map<String, BigDecimal> capacities = conversion.getLeft();
|
||||||
|
|
||||||
|
assertEquals("Number of queues", 3, capacities.size());
|
||||||
|
// this is no fixing - it's the result of BigDecimal rounding
|
||||||
|
assertEquals("root.a capacity", new BigDecimal("16.667"),
|
||||||
|
capacities.get("root.a"));
|
||||||
|
assertEquals("root.b capacity", new BigDecimal("33.333"),
|
||||||
|
capacities.get("root.b"));
|
||||||
|
assertEquals("root.c capacity", new BigDecimal("50.000"),
|
||||||
|
capacities.get("root.c"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiWeightConversionWhenOfThemIsZero() {
|
||||||
|
List<FSQueue> queues = createFSQueues(0, 1, 1);
|
||||||
|
|
||||||
|
Pair<Map<String, BigDecimal>, Boolean> conversion =
|
||||||
|
WeightToCapacityConversionUtil.getCapacities(2, queues, ruleHandler);
|
||||||
|
|
||||||
|
Map<String, BigDecimal> capacities = conversion.getLeft();
|
||||||
|
|
||||||
|
assertFalse("Capacity zerosum allowed", conversion.getRight());
|
||||||
|
assertEquals("Number of queues", 3, capacities.size());
|
||||||
|
assertEquals("root.a capacity", new BigDecimal("0.000"),
|
||||||
|
capacities.get("root.a"));
|
||||||
|
assertEquals("root.b capacity", new BigDecimal("50.000"),
|
||||||
|
capacities.get("root.b"));
|
||||||
|
assertEquals("root.c capacity", new BigDecimal("50.000"),
|
||||||
|
capacities.get("root.c"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMultiWeightConversionWhenAllOfThemAreZero() {
|
||||||
|
List<FSQueue> queues = createFSQueues(0, 0, 0);
|
||||||
|
|
||||||
|
Pair<Map<String, BigDecimal>, Boolean> conversion =
|
||||||
|
WeightToCapacityConversionUtil.getCapacities(0, queues, ruleHandler);
|
||||||
|
|
||||||
|
Map<String, BigDecimal> capacities = conversion.getLeft();
|
||||||
|
|
||||||
|
assertEquals("Number of queues", 3, capacities.size());
|
||||||
|
assertTrue("Capacity zerosum allowed", conversion.getRight());
|
||||||
|
assertEquals("root.a capacity", new BigDecimal("0.000"),
|
||||||
|
capacities.get("root.a"));
|
||||||
|
assertEquals("root.b capacity", new BigDecimal("0.000"),
|
||||||
|
capacities.get("root.b"));
|
||||||
|
assertEquals("root.c capacity", new BigDecimal("0.000"),
|
||||||
|
capacities.get("root.c"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCapacityFixingWithThreeQueues() {
|
||||||
|
List<FSQueue> queues = createFSQueues(1, 1, 1);
|
||||||
|
|
||||||
|
Pair<Map<String, BigDecimal>, Boolean> conversion =
|
||||||
|
WeightToCapacityConversionUtil.getCapacities(3, queues, ruleHandler);
|
||||||
|
|
||||||
|
Map<String, BigDecimal> capacities = conversion.getLeft();
|
||||||
|
assertEquals("Number of queues", 3, capacities.size());
|
||||||
|
assertEquals("root.a capacity", new BigDecimal("33.334"),
|
||||||
|
capacities.get("root.a"));
|
||||||
|
assertEquals("root.b capacity", new BigDecimal("33.333"),
|
||||||
|
capacities.get("root.b"));
|
||||||
|
assertEquals("root.c capacity", new BigDecimal("33.333"),
|
||||||
|
capacities.get("root.c"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCapacityFixingWhenTotalCapacityIsGreaterThanHundred() {
|
||||||
|
Map<String, BigDecimal> capacities = new HashMap<>();
|
||||||
|
capacities.put("root.a", new BigDecimal("50.001"));
|
||||||
|
capacities.put("root.b", new BigDecimal("25.500"));
|
||||||
|
capacities.put("root.c", new BigDecimal("25.500"));
|
||||||
|
|
||||||
|
testCapacityFixing(capacities, new BigDecimal("100.001"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCapacityFixWhenTotalCapacityIsLessThanHundred() {
|
||||||
|
Map<String, BigDecimal> capacities = new HashMap<>();
|
||||||
|
capacities.put("root.a", new BigDecimal("49.999"));
|
||||||
|
capacities.put("root.b", new BigDecimal("25.500"));
|
||||||
|
capacities.put("root.c", new BigDecimal("25.500"));
|
||||||
|
|
||||||
|
testCapacityFixing(capacities, new BigDecimal("99.999"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testCapacityFixing(Map<String, BigDecimal> capacities,
|
||||||
|
BigDecimal total) {
|
||||||
|
// Note: we call fixCapacities() directly because it makes
|
||||||
|
// testing easier
|
||||||
|
boolean needCapacityValidationRelax =
|
||||||
|
WeightToCapacityConversionUtil.fixCapacities(capacities,
|
||||||
|
total);
|
||||||
|
|
||||||
|
assertFalse("Capacity zerosum allowed", needCapacityValidationRelax);
|
||||||
|
assertEquals("root.a capacity", new BigDecimal("50.000"),
|
||||||
|
capacities.get("root.a"));
|
||||||
|
assertEquals("root.b capacity", new BigDecimal("25.500"),
|
||||||
|
capacities.get("root.b"));
|
||||||
|
assertEquals("root.c capacity", new BigDecimal("25.500"),
|
||||||
|
capacities.get("root.c"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<FSQueue> createFSQueues(int... weights){
|
||||||
|
char current = 'a';
|
||||||
|
|
||||||
|
List<FSQueue> queues = new ArrayList<>();
|
||||||
|
|
||||||
|
for (int w : weights) {
|
||||||
|
FSQueue queue = mock(FSQueue.class);
|
||||||
|
when(queue.getWeight()).thenReturn((float)w);
|
||||||
|
when(queue.getName()).thenReturn(
|
||||||
|
"root." + new String(new char[] {current}));
|
||||||
|
when(queue.getMinShare()).thenReturn(Resources.none());
|
||||||
|
current++;
|
||||||
|
queues.add(queue);
|
||||||
|
}
|
||||||
|
|
||||||
|
return queues;
|
||||||
|
}
|
||||||
|
}
|
|
@ -73,6 +73,15 @@
|
||||||
<maxAMShare>-1.0</maxAMShare>
|
<maxAMShare>-1.0</maxAMShare>
|
||||||
</queue>
|
</queue>
|
||||||
</queue>
|
</queue>
|
||||||
|
<queue name="misc" type="parent">
|
||||||
|
<weight>0</weight>
|
||||||
|
<queue name="a">
|
||||||
|
<weight>0</weight>
|
||||||
|
</queue>
|
||||||
|
<queue name="b">
|
||||||
|
<weight>0</weight>
|
||||||
|
</queue>
|
||||||
|
</queue>
|
||||||
</queue>
|
</queue>
|
||||||
<user name="alice">
|
<user name="alice">
|
||||||
<maxRunningApps>30</maxRunningApps>
|
<maxRunningApps>30</maxRunningApps>
|
||||||
|
|
Loading…
Reference in New Issue