YARN-10674. fs2cs should generate auto-created queue deletion properties. Contributed by Qi Zhu.

This commit is contained in:
Peter Bacsko 2021-03-24 08:15:06 +01:00
parent 4b4ccce02f
commit ceb75e1e2a
6 changed files with 261 additions and 13 deletions

View File

@ -118,6 +118,14 @@ public class FSConfigToCSConfigArgumentHandler {
"pc", "percentage",
"Converts FS queue weights to percentages",
false),
DISABLE_PREEMPTION("disable preemption", "dp", "disable-preemption",
"Disable the preemption with nopolicy or observeonly mode. " +
"Preemption is enabled by default. " +
"nopolicy removes ProportionalCapacityPreemptionPolicy from " +
"the list of monitor policies, " +
"observeonly sets " +
"yarn.resourcemanager.monitor.capacity.preemption.observe_only " +
"to true.", true),
HELP("help", "h", "help", "Displays the list of options", false);
private final String name;
@ -251,6 +259,12 @@ public class FSConfigToCSConfigArgumentHandler {
cliParser.getOptionValue(CliOption.CONVERSION_RULES.shortSwitch);
String outputDir =
cliParser.getOptionValue(CliOption.OUTPUT_DIR.shortSwitch);
FSConfigToCSConfigConverterParams.
PreemptionMode preemptionMode =
FSConfigToCSConfigConverterParams.
PreemptionMode.fromString(cliParser.
getOptionValue(CliOption.DISABLE_PREEMPTION.shortSwitch));
boolean convertPlacementRules =
!cliParser.hasOption(
CliOption.SKIP_PLACEMENT_RULES_CONVERSION.shortSwitch);
@ -260,6 +274,10 @@ public class FSConfigToCSConfigArgumentHandler {
checkFile(CliOption.CONVERSION_RULES, conversionRulesFile);
checkDirectory(CliOption.OUTPUT_DIR, outputDir);
checkOutputDirDoesNotContainXmls(yarnSiteXmlFile, outputDir);
if (cliParser.hasOption(CliOption.
DISABLE_PREEMPTION.shortSwitch)) {
checkDisablePreemption(preemptionMode);
}
// check mapping-rules.json if we intend to generate it
if (!cliParser.hasOption(CliOption.CONSOLE_MODE.shortSwitch) &&
@ -281,6 +299,7 @@ public class FSConfigToCSConfigArgumentHandler {
cliParser.hasOption(CliOption.RULES_TO_FILE.shortSwitch))
.withUsePercentages(
cliParser.hasOption(CliOption.CONVERT_PERCENTAGES.shortSwitch))
.withDisablePreemption(preemptionMode)
.build();
}
@ -383,6 +402,16 @@ public class FSConfigToCSConfigArgumentHandler {
}
}
private static void checkDisablePreemption(FSConfigToCSConfigConverterParams.
PreemptionMode preemptionMode) {
if (preemptionMode == FSConfigToCSConfigConverterParams.
PreemptionMode.ENABLED) {
throw new PreconditionException(
"Specified disable-preemption mode is illegal, " +
" use nopolicy or observeonly.");
}
}
private FSConfigToCSConfigConverter getConverter() {
return new FSConfigToCSConfigConverter(ruleHandler, conversionOptions);
}

View File

@ -103,6 +103,8 @@ public class FSConfigToCSConfigConverter {
private String outputDirectory;
private boolean rulesToFile;
private boolean usePercentages;
private FSConfigToCSConfigConverterParams.
PreemptionMode preemptionMode;
public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
ruleHandler, ConversionOptions conversionOptions) {
@ -121,6 +123,7 @@ public class FSConfigToCSConfigConverter {
this.outputDirectory = params.getOutputDirectory();
this.rulesToFile = params.isPlacementRulesToFile();
this.usePercentages = params.isUsePercentages();
this.preemptionMode = params.getPreemptionMode();
prepareOutputFiles(params.isConsole());
loadConversionRules(params.getConversionRulesConfig());
Configuration inputYarnSiteConfig = getInputYarnSiteConfig(params);
@ -277,7 +280,8 @@ public class FSConfigToCSConfigConverter {
new FSYarnSiteConverter();
siteConverter.convertSiteProperties(inputYarnSiteConfig,
convertedYarnSiteConfig, drfUsed,
conversionOptions.isEnableAsyncScheduler());
conversionOptions.isEnableAsyncScheduler(),
usePercentages, preemptionMode);
preemptionEnabled = siteConverter.isPreemptionEnabled();
sizeBasedWeight = siteConverter.isSizeBasedWeight();
@ -291,6 +295,7 @@ public class FSConfigToCSConfigConverter {
emitDefaultUserMaxParallelApplications();
emitUserMaxParallelApplications();
emitDefaultMaxAMShare();
emitDisablePreemptionForObserveOnlyMode();
FSQueueConverter queueConverter = FSQueueConverterBuilder.create()
.withRuleHandler(ruleHandler)
@ -407,6 +412,14 @@ public class FSConfigToCSConfigConverter {
queueMaxAMShareDefault);
}
}
private void emitDisablePreemptionForObserveOnlyMode() {
if (preemptionMode == FSConfigToCSConfigConverterParams
.PreemptionMode.OBSERVE_ONLY) {
capacitySchedulerConfig.
setBoolean(CapacitySchedulerConfiguration.
PREEMPTION_OBSERVE_ONLY, true);
}
}
private void emitACLs(FairScheduler fs) {
fs.getAllocationConfiguration().getQueueAcls()

View File

@ -30,6 +30,35 @@ public final class FSConfigToCSConfigConverterParams {
private boolean convertPlacementRules;
private boolean placementRulesToFile;
private boolean usePercentages;
private PreemptionMode preemptionMode;
public enum PreemptionMode {
ENABLED("enabled"),
NO_POLICY("nopolicy"),
OBSERVE_ONLY("observeonly");
private String cliOption;
PreemptionMode(String cliOption) {
this.cliOption = cliOption;
}
public String getCliOption() {
return cliOption;
}
public static PreemptionMode fromString(String cliOption) {
if (cliOption != null && cliOption.trim().
equals(PreemptionMode.OBSERVE_ONLY.getCliOption())) {
return PreemptionMode.OBSERVE_ONLY;
} else if (cliOption != null && cliOption.trim().
equals(PreemptionMode.NO_POLICY.getCliOption())) {
return PreemptionMode.NO_POLICY;
} else {
return PreemptionMode.ENABLED;
}
}
}
private FSConfigToCSConfigConverterParams() {
//must use builder
@ -71,6 +100,10 @@ public final class FSConfigToCSConfigConverterParams {
return usePercentages;
}
public PreemptionMode getPreemptionMode() {
return preemptionMode;
}
@Override
public String toString() {
return "FSConfigToCSConfigConverterParams{" +
@ -99,6 +132,7 @@ public final class FSConfigToCSConfigConverterParams {
private boolean convertPlacementRules;
private boolean placementRulesToFile;
private boolean usePercentages;
private PreemptionMode preemptionMode;
private Builder() {
}
@ -152,6 +186,11 @@ public final class FSConfigToCSConfigConverterParams {
return this;
}
public Builder withDisablePreemption(PreemptionMode preemptionMode) {
this.preemptionMode = preemptionMode;
return this;
}
public FSConfigToCSConfigConverterParams build() {
FSConfigToCSConfigConverterParams params =
new FSConfigToCSConfigConverterParams();
@ -164,6 +203,7 @@ public final class FSConfigToCSConfigConverterParams {
params.convertPlacementRules = this.convertPlacementRules;
params.placementRulesToFile = this.placementRulesToFile;
params.usePercentages = this.usePercentages;
params.preemptionMode = this.preemptionMode;
return params;
}
}

View File

@ -20,6 +20,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueDeletionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
@ -36,7 +38,9 @@ public class FSYarnSiteConverter {
@SuppressWarnings({"deprecation", "checkstyle:linelength"})
public void convertSiteProperties(Configuration conf,
Configuration yarnSiteConfig, boolean drfUsed, boolean enableAsyncScheduler) {
Configuration yarnSiteConfig, boolean drfUsed,
boolean enableAsyncScheduler, boolean userPercentage,
FSConfigToCSConfigConverterParams.PreemptionMode preemptionMode) {
yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getCanonicalName());
@ -52,12 +56,20 @@ public class FSYarnSiteConverter {
"schedule-asynchronously.scheduling-interval-ms", interval);
}
// This should be always true to trigger cs auto
// refresh queue.
yarnSiteConfig.setBoolean(
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
if (conf.getBoolean(FairSchedulerConfiguration.PREEMPTION,
FairSchedulerConfiguration.DEFAULT_PREEMPTION)) {
yarnSiteConfig.setBoolean(
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
preemptionEnabled = true;
String policies = addMonitorPolicy(ProportionalCapacityPreemptionPolicy.
class.getCanonicalName(), yarnSiteConfig);
yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
policies);
int waitTimeBeforeKill = conf.getInt(
FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL,
FairSchedulerConfiguration.DEFAULT_WAIT_TIME_BEFORE_KILL);
@ -71,6 +83,23 @@ public class FSYarnSiteConverter {
yarnSiteConfig.setLong(
CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
waitBeforeNextStarvationCheck);
} else {
if (preemptionMode ==
FSConfigToCSConfigConverterParams.PreemptionMode.NO_POLICY) {
yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES, "");
}
}
// For auto created queue's auto deletion.
if (!userPercentage) {
String policies = addMonitorPolicy(AutoCreatedQueueDeletionPolicy.
class.getCanonicalName(), yarnSiteConfig);
yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
policies);
// Set the expired for deletion interval to 10s, consistent with fs.
yarnSiteConfig.setInt(CapacitySchedulerConfiguration.
AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME, 10);
}
if (conf.getBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE,
@ -132,4 +161,17 @@ public class FSYarnSiteConverter {
public boolean isSizeBasedWeight() {
return sizeBasedWeight;
}
private String addMonitorPolicy(String policyName,
Configuration yarnSiteConfig) {
String policies =
yarnSiteConfig.get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES);
if (policies == null || policies.isEmpty()) {
policies = policyName;
} else {
policies += "," + policyName;
}
return policies;
}
}

View File

@ -687,6 +687,21 @@ public class TestFSConfigToCSConfigConverter {
schedulingEnabledValue);
}
@Test
public void testSiteDisabledPreemptionWithObserveOnlyConversion()
throws Exception{
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withDisablePreemption(FSConfigToCSConfigConverterParams.
PreemptionMode.OBSERVE_ONLY)
.build();
converter.convert(params);
assertTrue("The observe only should be true",
converter.getCapacitySchedulerConfig().
getBoolean(CapacitySchedulerConfiguration.
PREEMPTION_OBSERVE_ONLY, false));
}
private boolean testConversionWithAsyncSchedulingOption(boolean enabled) throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource(CLUSTER_RESOURCE_STRING)

View File

@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueDeletionPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
@ -28,6 +30,7 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNotEquals;
/**
* Unit tests for FSYarnSiteConverter.
@ -37,6 +40,8 @@ public class TestFSYarnSiteConverter {
private Configuration yarnConfig;
private FSYarnSiteConverter converter;
private Configuration yarnConvertedConfig;
private static final String DELETION_POLICY_CLASS =
AutoCreatedQueueDeletionPolicy.class.getCanonicalName();
@Before
public void setup() {
@ -54,7 +59,7 @@ public class TestFSYarnSiteConverter {
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, 666);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
false, false, null);
assertTrue("Cont. scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false));
@ -73,7 +78,7 @@ public class TestFSYarnSiteConverter {
321);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
false, false, null);
assertTrue("Preemption enabled",
yarnConvertedConfig.getBoolean(
@ -87,6 +92,41 @@ public class TestFSYarnSiteConverter {
yarnConvertedConfig.getInt(
CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
-1));
assertFalse("Observe_only should be false",
yarnConvertedConfig.getBoolean(CapacitySchedulerConfiguration.
PREEMPTION_OBSERVE_ONLY, false));
assertTrue("Should contain ProportionalCapacityPreemptionPolicy.",
yarnConvertedConfig.
get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES).
contains(ProportionalCapacityPreemptionPolicy.
class.getCanonicalName()));
}
@Test
public void testSiteDisabledPreemptionWithNoPolicyConversion() {
// Default mode is nopolicy
yarnConfig.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false, false, null);
assertFalse("Should not contain ProportionalCapacityPreemptionPolicy.",
yarnConvertedConfig.
get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES).
contains(ProportionalCapacityPreemptionPolicy.
class.getCanonicalName()));
yarnConfig.setBoolean(FairSchedulerConfiguration.PREEMPTION, false);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false, false,
FSConfigToCSConfigConverterParams.PreemptionMode.NO_POLICY);
assertFalse("Should not contain ProportionalCapacityPreemptionPolicy.",
yarnConvertedConfig.
get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES).
contains(ProportionalCapacityPreemptionPolicy.
class.getCanonicalName()));
}
@Test
@ -94,7 +134,7 @@ public class TestFSYarnSiteConverter {
yarnConfig.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
false, false, null);
assertTrue("Assign multiple",
yarnConvertedConfig.getBoolean(
@ -107,7 +147,7 @@ public class TestFSYarnSiteConverter {
yarnConfig.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 111);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
false, false, null);
assertEquals("Max assign", 111,
yarnConvertedConfig.getInt(
@ -122,7 +162,7 @@ public class TestFSYarnSiteConverter {
"321.321");
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
false, false, null);
assertEquals("Locality threshold node", "123.123",
yarnConvertedConfig.get(
@ -135,7 +175,7 @@ public class TestFSYarnSiteConverter {
@Test
public void testSiteDrfEnabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true,
false);
false, false, null);
assertEquals("Resource calculator type", DominantResourceCalculator.class,
yarnConvertedConfig.getClass(
@ -145,7 +185,7 @@ public class TestFSYarnSiteConverter {
@Test
public void testSiteDrfDisabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
false, false, null);
assertEquals("Resource calculator type", DefaultResourceCalculator.class,
yarnConvertedConfig.getClass(
@ -156,7 +196,7 @@ public class TestFSYarnSiteConverter {
@Test
public void testAsyncSchedulingEnabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true,
true);
true, false, null);
assertTrue("Asynchronous scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
@ -166,10 +206,79 @@ public class TestFSYarnSiteConverter {
@Test
public void testAsyncSchedulingDisabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
false, false, null);
assertFalse("Asynchronous scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE));
}
@Test
public void testSiteQueueAutoDeletionConversionWithWeightMode() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false, false, null);
assertTrue(yarnConvertedConfig.get(YarnConfiguration.
RM_SCHEDULER_ENABLE_MONITORS), true);
assertTrue("Scheduling Policies contain auto deletion policy",
yarnConvertedConfig.
get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES)
.contains(DELETION_POLICY_CLASS));
// Test when policy has existed.
yarnConvertedConfig.
set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
"testPolicy");
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false, false, null);
assertTrue("Scheduling Policies contain auto deletion policy",
yarnConvertedConfig.
get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES)
.contains(DELETION_POLICY_CLASS));
assertEquals("Auto deletion policy expired time should be 10s",
10, yarnConvertedConfig.
getLong(CapacitySchedulerConfiguration.
AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME,
CapacitySchedulerConfiguration.
DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME));
}
@Test
public void
testSiteQueueAutoDeletionConversionDisabledForPercentageMode() {
// test percentage mode
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false, true, null);
assertTrue(yarnConvertedConfig.get(YarnConfiguration.
RM_SCHEDULER_ENABLE_MONITORS), true);
assertTrue("Scheduling Policies should not" +
"contain auto deletion policy in percentage mode",
yarnConvertedConfig.
get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES) == null ||
!yarnConvertedConfig.
get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES).
contains(DELETION_POLICY_CLASS));
yarnConvertedConfig.
set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
"testPolicy");
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false, true, null);
assertFalse("Scheduling Policies should not " +
"contain auto deletion policy in percentage mode",
yarnConvertedConfig.
get(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES)
.contains(DELETION_POLICY_CLASS));
assertNotEquals("Auto deletion policy expired time should not " +
"be set in percentage mode",
10, yarnConvertedConfig.
getLong(CapacitySchedulerConfiguration.
AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME,
CapacitySchedulerConfiguration.
DEFAULT_AUTO_CREATE_CHILD_QUEUE_EXPIRED_TIME));
}
}