YARN-10525. Add weight mode conversion to fs2cs. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2021-01-13 14:24:12 +01:00
parent 61f77b7674
commit 836c630430
16 changed files with 608 additions and 273 deletions

View File

@ -115,6 +115,10 @@ public class FSConfigToCSConfigArgumentHandler {
RULES_TO_FILE("rules to external file", "e", "rules-to-file",
"Generates the converted placement rules to an external JSON file " +
"called mapping-rules.json", false),
CONVERT_PERCENTAGES("convert weights to percentages",
"pc", "percentage",
"Converts FS queue weights to percentages",
false),
HELP("help", "h", "help", "Displays the list of options", false);
private final String name;
@ -275,6 +279,8 @@ public class FSConfigToCSConfigArgumentHandler {
.withConvertPlacementRules(convertPlacementRules)
.withPlacementRulesToFile(
cliParser.hasOption(CliOption.RULES_TO_FILE.shortSwitch))
.withUsePercentages(
cliParser.hasOption(CliOption.CONVERT_PERCENTAGES.shortSwitch))
.build();
}

View File

@ -102,6 +102,7 @@ public class FSConfigToCSConfigConverter {
private boolean convertPlacementRules = true;
private String outputDirectory;
private boolean rulesToFile;
private boolean usePercentages;
public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
ruleHandler, ConversionOptions conversionOptions) {
@ -119,6 +120,7 @@ public class FSConfigToCSConfigConverter {
this.convertPlacementRules = params.isConvertPlacementRules();
this.outputDirectory = params.getOutputDirectory();
this.rulesToFile = params.isPlacementRulesToFile();
this.usePercentages = params.isUsePercentages();
prepareOutputFiles(params.isConsole());
loadConversionRules(params.getConversionRulesConfig());
Configuration inputYarnSiteConfig = getInputYarnSiteConfig(params);
@ -300,6 +302,7 @@ public class FSConfigToCSConfigConverter {
.withQueueMaxAppsDefault(queueMaxAppsDefault)
.withConversionOptions(conversionOptions)
.withDrfUsed(drfUsed)
.withPercentages(usePercentages)
.build();
queueConverter.convertQueueHierarchy(rootQueue);

View File

@ -29,7 +29,7 @@ public final class FSConfigToCSConfigConverterParams {
private String outputDirectory;
private boolean convertPlacementRules;
private boolean placementRulesToFile;
private boolean usePercentages;
private FSConfigToCSConfigConverterParams() {
//must use builder
@ -67,6 +67,10 @@ public final class FSConfigToCSConfigConverterParams {
return placementRulesToFile;
}
public boolean isUsePercentages() {
return usePercentages;
}
@Override
public String toString() {
return "FSConfigToCSConfigConverterParams{" +
@ -84,6 +88,7 @@ public final class FSConfigToCSConfigConverterParams {
* Builder that can construct FSConfigToCSConfigConverterParams objects.
*
*/
@SuppressWarnings("checkstyle:hiddenfield")
public static final class Builder {
private String yarnSiteXmlConfig;
private String fairSchedulerXmlConfig;
@ -93,6 +98,7 @@ public final class FSConfigToCSConfigConverterParams {
private String outputDirectory;
private boolean convertPlacementRules;
private boolean placementRulesToFile;
private boolean usePercentages;
private Builder() {
}
@ -141,6 +147,11 @@ public final class FSConfigToCSConfigConverterParams {
return this;
}
public Builder withUsePercentages(boolean usePercentages) {
this.usePercentages = usePercentages;
return this;
}
public FSConfigToCSConfigConverterParams build() {
FSConfigToCSConfigConverterParams params =
new FSConfigToCSConfigConverterParams();
@ -152,6 +163,7 @@ public final class FSConfigToCSConfigConverterParams {
params.outputDirectory = this.outputDirectory;
params.convertPlacementRules = this.convertPlacementRules;
params.placementRulesToFile = this.placementRulesToFile;
params.usePercentages = this.usePercentages;
return params;
}
}

View File

@ -18,17 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import java.math.BigDecimal;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion.CapacityConverter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion.CapacityConverterFactory;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@ -54,6 +53,7 @@ public class FSQueueConverter {
private final float queueMaxAMShareDefault;
private final int queueMaxAppsDefault;
private final boolean drfUsed;
private final boolean usePercentages;
private ConversionOptions conversionOptions;
@ -67,6 +67,7 @@ public class FSQueueConverter {
this.queueMaxAppsDefault = builder.queueMaxAppsDefault;
this.conversionOptions = builder.conversionOptions;
this.drfUsed = builder.drfUsed;
this.usePercentages = builder.usePercentages;
}
public void convertQueueHierarchy(FSQueue queue) {
@ -267,24 +268,14 @@ public class FSQueueConverter {
* @param queue
*/
private void emitChildCapacity(FSQueue queue) {
List<FSQueue> children = queue.getChildQueues();
CapacityConverter converter =
CapacityConverterFactory.getConverter(usePercentages);
int totalWeight = getTotalWeight(children);
Pair<Map<String, BigDecimal>, Boolean> result =
WeightToCapacityConversionUtil.getCapacities(
totalWeight, children, ruleHandler);
converter.convertWeightsForChildQueues(queue,
capacitySchedulerConfig);
Map<String, BigDecimal> capacities = result.getLeft();
boolean shouldAllowZeroSumCapacity = result.getRight();
capacities
.forEach((key, value) -> capacitySchedulerConfig.set(PREFIX + key +
".capacity", value.toString()));
if (shouldAllowZeroSumCapacity) {
String queueName = queue.getName();
capacitySchedulerConfig.setBoolean(
PREFIX + queueName + ".allow-zero-capacity-sum", true);
if (Resources.none().compareTo(queue.getMinShare()) != 0) {
ruleHandler.handleMinResources();
}
}
@ -305,14 +296,6 @@ public class FSQueueConverter {
}
}
private int getTotalWeight(List<FSQueue> children) {
double sum = children
.stream()
.mapToDouble(c -> c.getWeight())
.sum();
return (int) sum;
}
private String getQueueShortName(String queueName) {
int lastDot = queueName.lastIndexOf(".");
return queueName.substring(lastDot + 1);

View File

@ -32,6 +32,7 @@ public final class FSQueueConverterBuilder {
int queueMaxAppsDefault;
ConversionOptions conversionOptions;
boolean drfUsed;
boolean usePercentages;
private FSQueueConverterBuilder() {
}
@ -93,6 +94,11 @@ public final class FSQueueConverterBuilder {
return this;
}
public FSQueueConverterBuilder withPercentages(boolean usePercentages) {
this.usePercentages = usePercentages;
return this;
}
public FSQueueConverter build() {
return new FSQueueConverter(this);
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
public interface CapacityConverter {
void convertWeightsForChildQueues(FSQueue queue, Configuration csConfig);
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
public final class CapacityConverterFactory {
private CapacityConverterFactory() {
// no instances
}
public static CapacityConverter getConverter(
boolean usePercentage) {
return usePercentage ?
new WeightToPercentConverter() : new WeightToWeightConverter();
}
}

View File

@ -1,22 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import java.math.BigDecimal;
import java.math.RoundingMode;
@ -27,32 +29,41 @@ import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Utility class that converts Fair Scheduler weights to capacities in
* percentages.
*
* It also makes sure that the sum of the capacities adds up to exactly 100.0.
*
* There is a special case when one or more queues have a capacity of 0. This
* can happen if the weight was originally 0 in the FS configuration. In
* this case, we need an extra queue with a capacity of 100.0 to have a valid
* CS configuration.
*/
final class WeightToCapacityConversionUtil {
public class WeightToPercentConverter
implements CapacityConverter {
private static final BigDecimal HUNDRED = new BigDecimal(100).setScale(3);
private static final BigDecimal ZERO = new BigDecimal(0).setScale(3);
private WeightToCapacityConversionUtil() {
// no instances
@Override
public void convertWeightsForChildQueues(FSQueue queue,
Configuration csConfig) {
List<FSQueue> children = queue.getChildQueues();
int totalWeight = getTotalWeight(children);
Pair<Map<String, BigDecimal>, Boolean> result =
getCapacities(totalWeight, children);
Map<String, BigDecimal> capacities = result.getLeft();
boolean shouldAllowZeroSumCapacity = result.getRight();
capacities
.forEach((key, value) -> csConfig.set(PREFIX + key +
".capacity", value.toString()));
if (shouldAllowZeroSumCapacity) {
String queueName = queue.getName();
csConfig.setBoolean(
PREFIX + queueName + ".allow-zero-capacity-sum", true);
}
}
@VisibleForTesting
static Pair<Map<String, BigDecimal>, Boolean> getCapacities(int totalWeight,
List<FSQueue> children, FSConfigToCSConfigRuleHandler ruleHandler) {
private Pair<Map<String, BigDecimal>, Boolean> getCapacities(int totalWeight,
List<FSQueue> children) {
if (children.size() == 0) {
return Pair.of(new HashMap<>(), false);
@ -82,10 +93,6 @@ final class WeightToCapacityConversionUtil {
.setScale(3);
}
if (Resources.none().compareTo(queue.getMinShare()) != 0) {
ruleHandler.handleMinResources();
}
capacities.put(queue.getName(), pct);
});
@ -105,9 +112,8 @@ final class WeightToCapacityConversionUtil {
}
@VisibleForTesting
static boolean fixCapacities(Map<String, BigDecimal> capacities,
boolean fixCapacities(Map<String, BigDecimal> capacities,
BigDecimal totalPct) {
final BigDecimal hundred = new BigDecimal(100).setScale(3);
boolean shouldAllowZeroSumCapacity = false;
// Sort the list so we'll adjust the highest capacity value,
@ -134,11 +140,19 @@ final class WeightToCapacityConversionUtil {
// because we have zero weights on this level
shouldAllowZeroSumCapacity = true;
} else {
BigDecimal diff = hundred.subtract(totalPct);
BigDecimal diff = HUNDRED.subtract(totalPct);
BigDecimal correctedHighest = highestCapacity.add(diff);
capacities.put(highestCapacityQueue, correctedHighest);
}
return shouldAllowZeroSumCapacity;
}
private int getTotalWeight(List<FSQueue> children) {
double sum = children
.stream()
.mapToDouble(c -> c.getWeight())
.sum();
return (int) sum;
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
public class WeightToWeightConverter
implements CapacityConverter {
@Override
public void convertWeightsForChildQueues(FSQueue queue,
Configuration csConfig) {
List<FSQueue> children = queue.getChildQueues();
children.forEach(fsQueue -> csConfig.set(
getProperty(fsQueue), getWeightString(fsQueue)));
}
private String getProperty(FSQueue queue) {
return PREFIX + queue.getName() + ".capacity";
}
private String getWeightString(FSQueue queue) {
return Float.toString(queue.getWeight()) + "w";
}
}

View File

@ -713,4 +713,41 @@ public class TestFSConfigToCSConfigArgumentHandler {
assertFalse("-a switch wasn't provided but async scheduling option is true",
conversionOptions.isEnableAsyncScheduler());
}
@Test
public void testUsePercentages() throws Exception {
testUsePercentages(true);
}
@Test
public void testUseWeights() throws Exception {
testUsePercentages(false);
}
private void testUsePercentages(boolean enabled) throws Exception {
setupFSConfigConversionFiles(true);
FSConfigToCSConfigArgumentHandler argumentHandler =
new FSConfigToCSConfigArgumentHandler(conversionOptions, mockValidator);
argumentHandler.setConverterSupplier(this::getMockConverter);
String[] args;
if (enabled) {
args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE, "-p",
"-pc");
} else {
args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE, "-p");
}
argumentHandler.parseAndConvert(args);
ArgumentCaptor<FSConfigToCSConfigConverterParams> captor =
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
verify(mockConverter).convert(captor.capture());
FSConfigToCSConfigConverterParams params = captor.getValue();
assertEquals("Use percentages", enabled, params.isUsePercentages());
}
}

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.function.Consumer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@ -59,22 +60,48 @@ public class TestFSConfigToCSConfigConverterMain {
/*
* Example command:
* opt/hadoop/bin/yarn fs2cs
* /opt/hadoop/bin/yarn fs2cs
* -o /tmp/output
* -y /opt/hadoop/etc/hadoop/yarn-site.xml
* -f /opt/hadoop/etc/hadoop/fair-scheduler.xml
* -r /home/systest/sample-rules-config.properties
*/
@Test
public void testConvertFSConfigurationDefaults()
public void testConvertFSConfigurationDefaultsWeightMode()
throws Exception {
testConvertFSConfigurationDefaults(false);
}
/*
* Example command:
* /opt/hadoop/bin/yarn fs2cs
* -pc
* -o /tmp/output
* -y /opt/hadoop/etc/hadoop/yarn-site.xml
* -f /opt/hadoop/etc/hadoop/fair-scheduler.xml
* -r /home/systest/sample-rules-config.properties
*/
@Test
public void testConvertFSConfigurationDefaultsPercentageMode()
throws IOException {
testConvertFSConfigurationDefaults(true);
}
private void testConvertFSConfigurationDefaults(boolean percentage)
throws IOException {
setupFSConfigConversionFiles();
FSConfigToCSConfigConverterMain.main(new String[] {
String[] args = new String[] {
"-o", OUTPUT_DIR,
"-y", YARN_SITE_XML,
"-f", FS_ALLOC_FILE,
"-r", CONVERSION_RULES_FILE});
"-r", CONVERSION_RULES_FILE};
if (percentage) {
args = Arrays.copyOf(args, args.length + 1);
args[args.length - 1] = "-pc";
}
FSConfigToCSConfigConverterMain.main(args);
boolean csConfigExists =
new File(OUTPUT_DIR, "capacity-scheduler.xml").exists();
@ -142,6 +169,7 @@ public class TestFSConfigToCSConfigConverterMain {
"--print",
"--convert-placement-rules",
"--rules-to-file",
"--percentage",
"--yarnsiteconfig", YARN_SITE_XML,
"--fsconfig", FS_ALLOC_FILE,
"--rulesconfig", CONVERSION_RULES_FILE});

View File

@ -265,8 +265,8 @@ public class TestFSQueueConverter {
}
@Test
public void testChildCapacity() {
converter = builder.build();
public void testChildCapacityInCapacityMode() {
converter = builder.withPercentages(true).build();
converter.convertQueueHierarchy(rootQueue);
@ -299,9 +299,44 @@ public class TestFSQueueConverter {
csConfig.get(PREFIX + "root.misc.b.capacity"));
}
@Test
public void testChildCapacityInWeightMode() {
converter = builder.withPercentages(false).build();
converter.convertQueueHierarchy(rootQueue);
// root
assertEquals("root.default weight", "1.0w",
csConfig.get(PREFIX + "root.default.capacity"));
assertEquals("root.admins weight", "1.0w",
csConfig.get(PREFIX + "root.admins.capacity"));
assertEquals("root.users weight", "1.0w",
csConfig.get(PREFIX + "root.users.capacity"));
// root.users
assertEquals("root.users.john weight", "1.0w",
csConfig.get(PREFIX + "root.users.john.capacity"));
assertEquals("root.users.joe weight", "3.0w",
csConfig.get(PREFIX + "root.users.joe.capacity"));
// root.admins
assertEquals("root.admins.alice weight", "3.0w",
csConfig.get(PREFIX + "root.admins.alice.capacity"));
assertEquals("root.admins.bob weight", "1.0w",
csConfig.get(PREFIX + "root.admins.bob.capacity"));
// root.misc
assertEquals("root.misc weight", "0.0w",
csConfig.get(PREFIX + "root.misc.capacity"));
assertEquals("root.misc.a weight", "0.0w",
csConfig.get(PREFIX + "root.misc.a.capacity"));
assertEquals("root.misc.b weight", "0.0w",
csConfig.get(PREFIX + "root.misc.b.capacity"));
}
@Test
public void testZeroSumCapacityValidation() {
converter = builder.build();
converter = builder.withPercentages(true).build();
converter.convertQueueHierarchy(rootQueue);

View File

@ -1,194 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class TestWeightToCapacityConversionUtil {
@Mock
private FSConfigToCSConfigRuleHandler ruleHandler;
@Test
public void testSingleWeightConversion() {
List<FSQueue> queues = createFSQueues(1);
Pair<Map<String, BigDecimal>, Boolean> conversion =
WeightToCapacityConversionUtil.getCapacities(1, queues, ruleHandler);
assertFalse("Capacity zerosum allowed", conversion.getRight());
assertEquals("Capacity", new BigDecimal("100.000"),
conversion.getLeft().get("root.a"));
}
@Test
public void testNoChildQueueConversion() {
List<FSQueue> queues = new ArrayList<>();
Pair<Map<String, BigDecimal>, Boolean> conversion =
WeightToCapacityConversionUtil.getCapacities(1, queues, ruleHandler);
assertEquals("Converted items", 0, conversion.getLeft().size());
}
@Test
public void testMultiWeightConversion() {
List<FSQueue> queues = createFSQueues(1, 2, 3);
Pair<Map<String, BigDecimal>, Boolean> conversion =
WeightToCapacityConversionUtil.getCapacities(6, queues, ruleHandler);
Map<String, BigDecimal> capacities = conversion.getLeft();
assertEquals("Number of queues", 3, capacities.size());
// this is no fixing - it's the result of BigDecimal rounding
assertEquals("root.a capacity", new BigDecimal("16.667"),
capacities.get("root.a"));
assertEquals("root.b capacity", new BigDecimal("33.333"),
capacities.get("root.b"));
assertEquals("root.c capacity", new BigDecimal("50.000"),
capacities.get("root.c"));
}
@Test
public void testMultiWeightConversionWhenOfThemIsZero() {
List<FSQueue> queues = createFSQueues(0, 1, 1);
Pair<Map<String, BigDecimal>, Boolean> conversion =
WeightToCapacityConversionUtil.getCapacities(2, queues, ruleHandler);
Map<String, BigDecimal> capacities = conversion.getLeft();
assertFalse("Capacity zerosum allowed", conversion.getRight());
assertEquals("Number of queues", 3, capacities.size());
assertEquals("root.a capacity", new BigDecimal("0.000"),
capacities.get("root.a"));
assertEquals("root.b capacity", new BigDecimal("50.000"),
capacities.get("root.b"));
assertEquals("root.c capacity", new BigDecimal("50.000"),
capacities.get("root.c"));
}
@Test
public void testMultiWeightConversionWhenAllOfThemAreZero() {
List<FSQueue> queues = createFSQueues(0, 0, 0);
Pair<Map<String, BigDecimal>, Boolean> conversion =
WeightToCapacityConversionUtil.getCapacities(0, queues, ruleHandler);
Map<String, BigDecimal> capacities = conversion.getLeft();
assertEquals("Number of queues", 3, capacities.size());
assertTrue("Capacity zerosum allowed", conversion.getRight());
assertEquals("root.a capacity", new BigDecimal("0.000"),
capacities.get("root.a"));
assertEquals("root.b capacity", new BigDecimal("0.000"),
capacities.get("root.b"));
assertEquals("root.c capacity", new BigDecimal("0.000"),
capacities.get("root.c"));
}
@Test
public void testCapacityFixingWithThreeQueues() {
List<FSQueue> queues = createFSQueues(1, 1, 1);
Pair<Map<String, BigDecimal>, Boolean> conversion =
WeightToCapacityConversionUtil.getCapacities(3, queues, ruleHandler);
Map<String, BigDecimal> capacities = conversion.getLeft();
assertEquals("Number of queues", 3, capacities.size());
assertEquals("root.a capacity", new BigDecimal("33.334"),
capacities.get("root.a"));
assertEquals("root.b capacity", new BigDecimal("33.333"),
capacities.get("root.b"));
assertEquals("root.c capacity", new BigDecimal("33.333"),
capacities.get("root.c"));
}
@Test
public void testCapacityFixingWhenTotalCapacityIsGreaterThanHundred() {
Map<String, BigDecimal> capacities = new HashMap<>();
capacities.put("root.a", new BigDecimal("50.001"));
capacities.put("root.b", new BigDecimal("25.500"));
capacities.put("root.c", new BigDecimal("25.500"));
testCapacityFixing(capacities, new BigDecimal("100.001"));
}
@Test
public void testCapacityFixWhenTotalCapacityIsLessThanHundred() {
Map<String, BigDecimal> capacities = new HashMap<>();
capacities.put("root.a", new BigDecimal("49.999"));
capacities.put("root.b", new BigDecimal("25.500"));
capacities.put("root.c", new BigDecimal("25.500"));
testCapacityFixing(capacities, new BigDecimal("99.999"));
}
private void testCapacityFixing(Map<String, BigDecimal> capacities,
BigDecimal total) {
// Note: we call fixCapacities() directly because it makes
// testing easier
boolean needCapacityValidationRelax =
WeightToCapacityConversionUtil.fixCapacities(capacities,
total);
assertFalse("Capacity zerosum allowed", needCapacityValidationRelax);
assertEquals("root.a capacity", new BigDecimal("50.000"),
capacities.get("root.a"));
assertEquals("root.b capacity", new BigDecimal("25.500"),
capacities.get("root.b"));
assertEquals("root.c capacity", new BigDecimal("25.500"),
capacities.get("root.c"));
}
private List<FSQueue> createFSQueues(int... weights){
char current = 'a';
List<FSQueue> queues = new ArrayList<>();
for (int w : weights) {
FSQueue queue = mock(FSQueue.class);
when(queue.getWeight()).thenReturn((float)w);
when(queue.getName()).thenReturn(
"root." + new String(new char[] {current}));
when(queue.getMinShare()).thenReturn(Resources.none());
current++;
queues.add(queue);
}
return queues;
}
}

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.junit.Before;
import org.junit.Test;
public class TestWeightToPercentageConverter
extends WeightConverterTestBase {
private WeightToPercentConverter converter;
private Configuration config;
@Before
public void setup() {
converter = new WeightToPercentConverter();
config = new Configuration(false);
}
@Test
public void testSingleWeightConversion() {
FSQueue root = createFSQueues(1);
converter.convertWeightsForChildQueues(root, config);
assertFalse("Capacity zerosum allowed",
config.getBoolean(PREFIX + "root.allow-zero-capacity-sum",
false));
assertEquals("root.a capacity", "100.000",
config.get(PREFIX + "root.a.capacity"));
}
@Test
public void testNoChildQueueConversion() {
FSQueue root = createFSQueues();
converter.convertWeightsForChildQueues(root, config);
assertEquals("Converted items", 0,
config.getPropsWithPrefix(PREFIX).size());
}
@Test
public void testMultiWeightConversion() {
FSQueue root = createFSQueues(1, 2, 3);
converter.convertWeightsForChildQueues(root, config);
assertEquals("Number of properties", 3,
config.getPropsWithPrefix(PREFIX).size());
// this is no fixing - it's the result of BigDecimal rounding
assertEquals("root.a capacity", "16.667",
config.get(PREFIX + "root.a.capacity"));
assertEquals("root.b capacity", "33.333",
config.get(PREFIX + "root.b.capacity"));
assertEquals("root.c capacity", "50.000",
config.get(PREFIX + "root.c.capacity"));
}
@Test
public void testMultiWeightConversionWhenOfThemIsZero() {
FSQueue root = createFSQueues(0, 1, 1);
converter.convertWeightsForChildQueues(root, config);
assertFalse("Capacity zerosum allowed",
config.getBoolean(PREFIX + "root.allow-zero-capacity-sum",
false));
assertEquals("Number of properties", 3,
config.getPropsWithPrefix(PREFIX).size());
assertEquals("root.a capacity", "0.000",
config.get(PREFIX + "root.a.capacity"));
assertEquals("root.b capacity", "50.000",
config.get(PREFIX + "root.b.capacity"));
assertEquals("root.c capacity", "50.000",
config.get(PREFIX + "root.c.capacity"));
}
@Test
public void testMultiWeightConversionWhenAllOfThemAreZero() {
FSQueue root = createFSQueues(0, 0, 0);
converter.convertWeightsForChildQueues(root, config);
assertEquals("Number of properties", 4,
config.getPropsWithPrefix(PREFIX).size());
assertTrue("Capacity zerosum allowed",
config.getBoolean(PREFIX + "root.allow-zero-capacity-sum",
false));
assertEquals("root.a capacity", "0.000",
config.get(PREFIX + "root.a.capacity"));
assertEquals("root.b capacity", "0.000",
config.get(PREFIX + "root.b.capacity"));
assertEquals("root.c capacity", "0.000",
config.get(PREFIX + "root.c.capacity"));
}
@Test
public void testCapacityFixingWithThreeQueues() {
FSQueue root = createFSQueues(1, 1, 1);
converter.convertWeightsForChildQueues(root, config);
assertEquals("Number of properties", 3,
config.getPropsWithPrefix(PREFIX).size());
assertEquals("root.a capacity", "33.334",
config.get(PREFIX + "root.a.capacity"));
assertEquals("root.b capacity", "33.333",
config.get(PREFIX + "root.b.capacity"));
assertEquals("root.c capacity", "33.333",
config.get(PREFIX + "root.c.capacity"));
}
@Test
public void testCapacityFixingWhenTotalCapacityIsGreaterThanHundred() {
Map<String, BigDecimal> capacities = new HashMap<>();
capacities.put("root.a", new BigDecimal("50.001"));
capacities.put("root.b", new BigDecimal("25.500"));
capacities.put("root.c", new BigDecimal("25.500"));
testCapacityFixing(capacities, new BigDecimal("100.001"));
}
@Test
public void testCapacityFixWhenTotalCapacityIsLessThanHundred() {
Map<String, BigDecimal> capacities = new HashMap<>();
capacities.put("root.a", new BigDecimal("49.999"));
capacities.put("root.b", new BigDecimal("25.500"));
capacities.put("root.c", new BigDecimal("25.500"));
testCapacityFixing(capacities, new BigDecimal("99.999"));
}
private void testCapacityFixing(Map<String, BigDecimal> capacities,
BigDecimal total) {
// Note: we call fixCapacities() directly because it makes
// testing easier
boolean needCapacityValidationRelax =
converter.fixCapacities(capacities,
total);
assertFalse("Capacity zerosum allowed", needCapacityValidationRelax);
assertEquals("root.a capacity", new BigDecimal("50.000"),
capacities.get("root.a"));
assertEquals("root.b capacity", new BigDecimal("25.500"),
capacities.get("root.b"));
assertEquals("root.c capacity", new BigDecimal("25.500"),
capacities.get("root.c"));
}
}

View File

@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.junit.Before;
import org.junit.Test;
public class TestWeightToWeightConverter extends WeightConverterTestBase {
private WeightToWeightConverter converter;
private Configuration config;
@Before
public void setup() {
converter = new WeightToWeightConverter();
config = new Configuration(false);
}
@Test
public void testNoChildQueueConversion() {
FSQueue root = createFSQueues();
converter.convertWeightsForChildQueues(root, config);
assertEquals("Converted items", 0,
config.getPropsWithPrefix(PREFIX).size());
}
@Test
public void testSingleWeightConversion() {
FSQueue root = createFSQueues(1);
converter.convertWeightsForChildQueues(root, config);
assertEquals("root.a weight", "1.0w",
config.get(PREFIX + "root.a.capacity"));
}
@Test
public void testMultiWeightConversion() {
FSQueue root = createFSQueues(1, 2, 3);
converter.convertWeightsForChildQueues(root, config);
assertEquals("Number of properties", 3,
config.getPropsWithPrefix(PREFIX).size());
assertEquals("root.a weight", "1.0w",
config.get(PREFIX + "root.a.capacity"));
assertEquals("root.b weight", "2.0w",
config.get(PREFIX + "root.b.capacity"));
assertEquals("root.c weight", "3.0w",
config.get(PREFIX + "root.c.capacity"));
}
}

View File

@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.util.resource.Resources;
public abstract class WeightConverterTestBase {
protected FSQueue createFSQueues(int... weights){
char current = 'a';
List<FSQueue> queues = new ArrayList<>();
for (int w : weights) {
FSQueue queue = mock(FSQueue.class);
when(queue.getWeight()).thenReturn((float)w);
when(queue.getName()).thenReturn(
"root." + new String(new char[] {current}));
when(queue.getMinShare()).thenReturn(Resources.none());
current++;
queues.add(queue);
}
FSQueue root = mock(FSQueue.class);
when(root.getWeight()).thenReturn(1.0f);
when(root.getName()).thenReturn("root");
when(root.getMinShare()).thenReturn(Resources.none());
when(root.getChildQueues()).thenReturn(queues);
return root;
}
}