YARN-10168. FS-CS Converter: tool doesn't handle min/max resource conversion correctly. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2020-03-10 16:07:46 +01:00
parent ede05b19d1
commit 9314ef947f
7 changed files with 96 additions and 170 deletions

View File

@ -55,6 +55,12 @@ public class FSConfigToCSConfigRuleHandler {
public static final String MAX_CHILD_CAPACITY =
"maxChildCapacity.action";
public static final String MAX_RESOURCES =
"maxResources.action";
public static final String MIN_RESOURCES =
"minResources.action";
public static final String USER_MAX_RUNNING_APPS =
"userMaxRunningApps.action";
@ -118,6 +124,8 @@ public FSConfigToCSConfigRuleHandler(ConversionOptions conversionOptions) {
public void initPropertyActions() {
setActionForProperty(MAX_CAPACITY_PERCENTAGE);
setActionForProperty(MAX_CHILD_CAPACITY);
setActionForProperty(MAX_RESOURCES);
setActionForProperty(MIN_RESOURCES);
setActionForProperty(USER_MAX_RUNNING_APPS);
setActionForProperty(USER_MAX_APPS_DEFAULT);
setActionForProperty(DYNAMIC_MAX_ASSIGN);
@ -138,6 +146,14 @@ public void handleMaxChildCapacity() {
handle(MAX_CHILD_CAPACITY, "<maxChildResources>", null);
}
public void handleMaxResources() {
handle(MAX_RESOURCES, "<maxResources>", null);
}
public void handleMinResources() {
handle(MIN_RESOURCES, "<minResources>", null);
}
public void handleChildQueueCount(String queue, int count) {
String value = properties.getProperty(MAX_CHILD_QUEUE_LIMIT);
if (value != null) {

View File

@ -27,7 +27,6 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
@ -36,7 +35,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@ -51,6 +49,7 @@ public class FSQueueConverter {
private Configuration capacitySchedulerConfig;
private final boolean preemptionEnabled;
private final boolean sizeBasedWeight;
@SuppressWarnings("unused")
private final Resource clusterResource;
private final float queueMaxAMShareDefault;
private final boolean autoCreateChildQueues;
@ -168,47 +167,13 @@ private void emitMaximumCapacity(String queueName, FSQueue queue) {
ConfigurableResource rawMaxShare = queue.getRawMaxShare();
final Resource maxResource = rawMaxShare.getResource();
long memSize = 0;
long vCores = 0;
boolean defined = false;
if (maxResource == null) {
if (rawMaxShare.getPercentages() != null) {
if (clusterResource == null) {
String message = String.format(
"<maxResources> defined in percentages for" +
" queue %s, but cluster resource parameter is not" +
" defined via CLI!", queueName);
conversionOptions.handleConversionError(message);
return;
}
ruleHandler.handleMaxCapacityPercentage(queueName);
double[] percentages = rawMaxShare.getPercentages();
int memIndex = ResourceUtils.getResourceTypeIndex().get("memory-mb");
int vcoreIndex = ResourceUtils.getResourceTypeIndex().get("vcores");
memSize = (long) (percentages[memIndex] *
clusterResource.getMemorySize());
vCores = (long) (percentages[vcoreIndex] *
clusterResource.getVirtualCores());
defined = true;
} else {
conversionOptions.handlePreconditionError(
"Illegal ConfigurableResource object = " + rawMaxShare);
}
} else if (isNotUnboundedResource(maxResource)) {
memSize = maxResource.getMemorySize();
vCores = maxResource.getVirtualCores();
defined = true;
if ((maxResource == null && rawMaxShare.getPercentages() != null)
|| isNotUnboundedResource(maxResource)) {
ruleHandler.handleMaxResources();
}
if (defined) {
capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-capacity",
String.format("[memory=%d, vcores=%d]", memSize, vCores));
}
capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-capacity",
"100");
}
/**
@ -330,7 +295,7 @@ private void emitChildCapacity(FSQueue queue) {
List<FSQueue> children = queue.getChildQueues();
int totalWeight = getTotalWeight(children);
Map<String, Capacity> capacities = getCapacities(totalWeight, children);
Map<String, BigDecimal> capacities = getCapacities(totalWeight, children);
capacities
.forEach((key, value) -> capacitySchedulerConfig.set(PREFIX + key +
".capacity", value.toString()));
@ -353,23 +318,21 @@ private void checkMaxChildCapacitySetting(FSQueue queue) {
}
}
private Map<String, Capacity> getCapacities(int totalWeight,
private Map<String, BigDecimal> getCapacities(int totalWeight,
List<FSQueue> children) {
final BigDecimal hundred = new BigDecimal(100).setScale(3);
if (children.size() == 0) {
return new HashMap<>();
} else if (children.size() == 1) {
Map<String, Capacity> capacity = new HashMap<>();
Map<String, BigDecimal> capacity = new HashMap<>();
String queueName = children.get(0).getName();
capacity.put(queueName, Capacity.newCapacity(hundred));
capacity.put(queueName, hundred);
return capacity;
} else {
Map<String, Capacity> capacities = new HashMap<>();
Map<String, BigDecimal> bdCapacities = new HashMap<>();
Map<String, BigDecimal> capacities = new HashMap<>();
MutableBoolean needVerifySum = new MutableBoolean(true);
children
.stream()
.forEach(queue -> {
@ -381,48 +344,28 @@ private Map<String, Capacity> getCapacities(int totalWeight,
.multiply(hundred)
.setScale(3);
// <minResources> defined?
if (Resources.none().compareTo(queue.getMinShare()) != 0) {
needVerifySum.setFalse();
/* TODO: Needs discussion.
*
* Probably it's not entirely correct this way!
* Eg. root.queue1 in FS translates to 33%
* capacity, but minResources is defined as 1vcore,8GB
* which is less than 33%.
*
* Therefore max(calculatedCapacity, minResource) is
* more sound.
*/
Resource minShare = queue.getMinShare();
// TODO: in Phase-2, we have to deal with other resources as well
String capacity = String.format("[memory=%d,vcores=%d]",
minShare.getMemorySize(), minShare.getVirtualCores());
capacities.put(queue.getName(), Capacity.newCapacity(capacity));
} else {
capacities.put(queue.getName(), Capacity.newCapacity(pct));
bdCapacities.put(queue.getName(), pct);
ruleHandler.handleMinResources();
}
capacities.put(queue.getName(), pct);
});
if (needVerifySum.isTrue()) {
BigDecimal totalPct = new BigDecimal(0);
for (Map.Entry<String, BigDecimal> entry : bdCapacities.entrySet()) {
totalPct = totalPct.add(entry.getValue());
BigDecimal totalPct = new BigDecimal(0);
for (Map.Entry<String, BigDecimal> entry : capacities.entrySet()) {
totalPct = totalPct.add(entry.getValue());
}
// fix last value if total != 100.000
if (!totalPct.equals(hundred)) {
BigDecimal tmp = new BigDecimal(0);
for (int i = 0; i < children.size() - 2; i++) {
tmp = tmp.add(capacities.get(children.get(i).getQueueName()));
}
// fix last value if total != 100.000
if (!totalPct.equals(hundred)) {
BigDecimal tmp = new BigDecimal(0);
for (int i = 0; i < children.size() - 2; i++) {
tmp = tmp.add(bdCapacities.get(children.get(i).getQueueName()));
}
String lastQueue = children.get(children.size() - 1).getName();
BigDecimal corrected = hundred.subtract(tmp);
capacities.put(lastQueue, Capacity.newCapacity(corrected));
}
String lastQueue = children.get(children.size() - 1).getName();
BigDecimal corrected = hundred.subtract(tmp);
capacities.put(lastQueue, corrected);
}
return capacities;
@ -445,38 +388,4 @@ private String getQueueShortName(String queueName) {
private boolean isNotUnboundedResource(Resource res) {
return Resources.unbounded().compareTo(res) != 0;
}
/*
* Represents a queue capacity in either percentage
* or in absolute resources
*/
private static class Capacity {
private BigDecimal percentage;
private String absoluteResource;
public static Capacity newCapacity(BigDecimal pct) {
Capacity capacity = new Capacity();
capacity.percentage = pct;
capacity.absoluteResource = null;
return capacity;
}
public static Capacity newCapacity(String absoluteResource) {
Capacity capacity = new Capacity();
capacity.percentage = null;
capacity.absoluteResource = absoluteResource;
return capacity;
}
@Override
public String toString() {
if (percentage != null) {
return percentage.toString();
} else {
return absoluteResource;
}
}
}
}

View File

@ -20,6 +20,8 @@
import java.io.File;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -36,9 +38,15 @@ public class TestConvertedConfigValidator {
@Before
public void setup() {
QueueMetrics.clearQueueMetrics();
validator = new ConvertedConfigValidator();
}
@After
public void after() {
QueueMetrics.clearQueueMetrics();
}
@Test
public void testValidationPassed() throws Exception {
validator.validateConvertedConfig(CONFIG_DIR_PASSES);

View File

@ -300,19 +300,6 @@ public void testConvertFSConfigPctModeUsedAndClusterResourceDefined()
converter.getClusterResource());
}
@Test
public void testConvertFSConfigPctModeUsedAndClusterResourceNotDefined()
throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.build();
expectedException.expect(ConversionException.class);
expectedException.expectMessage("cluster resource parameter" +
" is not defined via CLI");
converter.convert(params);
}
@Test
public void testConvertFSConfigurationClusterResourceInvalid()
throws Exception {

View File

@ -18,6 +18,8 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.DYNAMIC_MAX_ASSIGN;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CAPACITY_PERCENTAGE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_RESOURCES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MIN_RESOURCES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CHILD_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CHILD_QUEUE_LIMIT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.QUEUE_AUTO_CREATE;
@ -69,6 +71,8 @@ public void testInitPropertyActionsToWarning() throws IOException {
ruleHandler.handleDynamicMaxAssign();
ruleHandler.handleMaxCapacityPercentage("test");
ruleHandler.handleMaxChildCapacity();
ruleHandler.handleMinResources();
ruleHandler.handleMaxResources();
ruleHandler.handleQueueAutoCreate("test");
ruleHandler.handleReservationSystem();
ruleHandler.handleSpecifiedNotFirstRule();
@ -81,6 +85,8 @@ public void testAllRulesWarning() throws IOException {
Properties rules = new Properties();
rules.put(DYNAMIC_MAX_ASSIGN, WARNING);
rules.put(MAX_CAPACITY_PERCENTAGE, WARNING);
rules.put(MAX_RESOURCES, WARNING);
rules.put(MIN_RESOURCES, WARNING);
rules.put(MAX_CHILD_CAPACITY, WARNING);
rules.put(QUEUE_AUTO_CREATE, WARNING);
rules.put(RESERVATION_SYSTEM, WARNING);
@ -95,6 +101,8 @@ public void testAllRulesWarning() throws IOException {
ruleHandler.handleDynamicMaxAssign();
ruleHandler.handleMaxCapacityPercentage("test");
ruleHandler.handleMaxChildCapacity();
ruleHandler.handleMinResources();
ruleHandler.handleMaxResources();
ruleHandler.handleQueueAutoCreate("test");
ruleHandler.handleReservationSystem();
ruleHandler.handleSpecifiedNotFirstRule();
@ -108,6 +116,8 @@ public void testAllRulesAbort() throws IOException {
rules.put(DYNAMIC_MAX_ASSIGN, ABORT);
rules.put(MAX_CAPACITY_PERCENTAGE, ABORT);
rules.put(MAX_CHILD_CAPACITY, ABORT);
rules.put(MAX_RESOURCES, ABORT);
rules.put(MIN_RESOURCES, ABORT);
rules.put(QUEUE_AUTO_CREATE, ABORT);
rules.put(RESERVATION_SYSTEM, ABORT);
rules.put(SPECIFIED_NOT_FIRST, ABORT);
@ -125,6 +135,8 @@ public void testAllRulesAbort() throws IOException {
expectAbort(() -> ruleHandler.handleDynamicMaxAssign());
expectAbort(() -> ruleHandler.handleMaxCapacityPercentage("test"));
expectAbort(() -> ruleHandler.handleMaxChildCapacity());
expectAbort(() -> ruleHandler.handleMaxResources());
expectAbort(() -> ruleHandler.handleMinResources());
expectAbort(() -> ruleHandler.handleQueueAutoCreate("test"));
expectAbort(() -> ruleHandler.handleReservationSystem());
expectAbort(() -> ruleHandler.handleSpecifiedNotFirstRule());

View File

@ -79,7 +79,7 @@ private static String prepareFileName(String f) {
}
private FSQueueConverter converter;
private Configuration config;
private Configuration yarnConfig;
private Configuration csConfig;
private FairScheduler fs;
private FSQueue rootQueue;
@ -95,9 +95,10 @@ private static String prepareFileName(String f) {
@Before
public void setup() {
config = new Configuration(false);
config.set(FairSchedulerConfiguration.ALLOCATION_FILE, FAIR_SCHEDULER_XML);
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
yarnConfig = new Configuration(false);
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FAIR_SCHEDULER_XML);
yarnConfig.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
csConfig = new Configuration(false);
dryRunResultHolder = new DryRunResultHolder();
conversionOptions =
@ -123,7 +124,7 @@ private FairScheduler createFairScheduler() {
FairScheduler fairScheduler = new FairScheduler();
fairScheduler.setRMContext(ctx);
fairScheduler.init(config);
fairScheduler.init(yarnConfig);
return fairScheduler;
}
@ -187,7 +188,7 @@ public void testConvertQueueHierarchyWithSameLeafQueues() throws Exception {
String absolutePath =
new File("src/test/resources/fair-scheduler-sameleafqueue.xml")
.getAbsolutePath();
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
@ -317,19 +318,17 @@ public void testQueueMaximumCapacity() {
converter.convertQueueHierarchy(rootQueue);
assertEquals("root.users.joe maximum capacity", "[memory=8192, vcores=8]",
csConfig.get(PREFIX + "root.users.joe.maximum-capacity"));
assertEquals("root.admins.bob maximum capacity", "[memory=8192, vcores=2]",
csConfig.get(PREFIX + "root.admins.bob.maximum-capacity"));
assertEquals("root.admins.alice maximum capacity",
"[memory=16384, vcores=4]",
csConfig.get(PREFIX + "root.admins.alice.maximum-capacity"));
assertValueForQueues(ALL_QUEUES, ".maximum-capacity", csConfig, "100");
verify(ruleHandler, times(3)).handleMaxResources();
}
Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.users.joe",
"root.admins.bob",
"root.admins.alice"));
assertNoValueForQueues(remaining, ".maximum-capacity", csConfig);
@Test
public void testQueueMinimumCapacity() {
converter = builder.build();
converter.convertQueueHierarchy(rootQueue);
verify(ruleHandler, times(2)).handleMinResources();
}
@Test
@ -397,7 +396,7 @@ public void testQueueOrderingPolicy() throws Exception {
String absolutePath =
new File("src/test/resources/fair-scheduler-orderingpolicy.xml")
.getAbsolutePath();
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
@ -434,7 +433,7 @@ public void testQueueUnsupportedMixedOrderingPolicy() throws IOException {
String absolutePath =
new File("src/test/resources/fair-scheduler-orderingpolicy-mixed.xml")
.getAbsolutePath();
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
@ -465,7 +464,8 @@ public void testReservationSystemNotSupported() {
Mockito.doThrow(new UnsupportedPropertyException("maxCapacity"))
.when(ruleHandler).handleMaxChildCapacity();
config.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
yarnConfig.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
true);
converter.convertQueueHierarchy(rootQueue);
}
@ -475,7 +475,7 @@ public void testDryRunWithMultipleLeafQueueNames() throws IOException {
String absolutePath =
new File("src/test/resources/fair-scheduler-sameleafqueue.xml")
.getAbsolutePath();
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
@ -492,23 +492,6 @@ public void testDryRunWithMultipleLeafQueueNames() throws IOException {
error.contains("Leaf queues must be unique"));
}
@Test
public void testDryRunWithNoClusterResource() {
builder.withClusterResource(null);
prepareDryRunConverter();
rootQueue = fs.getQueueManager().getRootQueue();
converter.convertQueueHierarchy(rootQueue);
assertEquals("Dry run errors", 1, dryRunResultHolder.getErrors().size());
assertEquals("Dry run warnings", 0,
dryRunResultHolder.getWarnings().size());
String error = dryRunResultHolder.getErrors().iterator().next();
assertTrue("Unexpected error message",
error.contains("<maxResources> defined in percentages"));
}
private void assertNoValueForQueues(Set<String> queues, String postfix,
Configuration config) {
for (String queue : queues) {
@ -518,6 +501,15 @@ private void assertNoValueForQueues(Set<String> queues, String postfix,
}
}
private void assertValueForQueues(Set<String> queues, String postfix,
Configuration config, String expectedValue) {
for (String queue : queues) {
String key = PREFIX + queue + postfix;
assertEquals("Key " + key + " has different value",
expectedValue, config.get(key));
}
}
private void assertTrueForQueues(Set<String> queues, String postfix,
Configuration config) {
for (String queue : queues) {

View File

@ -31,6 +31,7 @@
<schedulingPolicy>drf</schedulingPolicy>
<queue name="john">
<weight>1.0</weight>
<minResources>memory-mb=4096, vcores=1</minResources>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>john </aclSubmitApps>
<aclAdministerApps>john </aclAdministerApps>
@ -38,6 +39,7 @@
</queue>
<queue name="joe">
<maxResources>memory-mb=50.0%, vcores=50.0%</maxResources>
<minResources>memory-mb=4096, vcores=1</minResources>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>