YARN-10292. FS-CS converter: add an option to enable asynchronous scheduling in CapacityScheduler. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2020-06-05 15:00:39 +02:00
parent 545a0a147c
commit 8abff5151a
7 changed files with 122 additions and 9 deletions

View File

@ -22,6 +22,7 @@ public class ConversionOptions {
private DryRunResultHolder dryRunResultHolder; private DryRunResultHolder dryRunResultHolder;
private boolean dryRun; private boolean dryRun;
private boolean noTerminalRuleCheck; private boolean noTerminalRuleCheck;
private boolean enableAsyncScheduler;
public ConversionOptions(DryRunResultHolder dryRunResultHolder, public ConversionOptions(DryRunResultHolder dryRunResultHolder,
boolean dryRun) { boolean dryRun) {
@ -41,6 +42,14 @@ public class ConversionOptions {
return noTerminalRuleCheck; return noTerminalRuleCheck;
} }
public void setEnableAsyncScheduler(boolean enableAsyncScheduler) {
this.enableAsyncScheduler = enableAsyncScheduler;
}
public boolean isEnableAsyncScheduler() {
return enableAsyncScheduler;
}
public void handleWarning(String msg, Logger log) { public void handleWarning(String msg, Logger log) {
if (dryRun) { if (dryRun) {
dryRunResultHolder.addDryRunWarning(msg); dryRunResultHolder.addDryRunWarning(msg);

View File

@ -109,6 +109,9 @@ public class FSConfigToCSConfigArgumentHandler {
SKIP_VERIFICATION("skip verification", "s", SKIP_VERIFICATION("skip verification", "s",
"skip-verification", "skip-verification",
"Skips the verification of the converted configuration", false), "Skips the verification of the converted configuration", false),
ENABLE_ASYNC_SCHEDULER("enable asynchronous scheduler", "a", "enable-async-scheduler",
"Enables the Asynchronous scheduler which decouples the CapacityScheduler" +
" scheduling from Node Heartbeats.", false),
HELP("help", "h", "help", "Displays the list of options", false); HELP("help", "h", "help", "Displays the list of options", false);
private final String name; private final String name;
@ -220,6 +223,8 @@ public class FSConfigToCSConfigArgumentHandler {
conversionOptions.setDryRun(dryRun); conversionOptions.setDryRun(dryRun);
conversionOptions.setNoTerminalRuleCheck( conversionOptions.setNoTerminalRuleCheck(
cliParser.hasOption(CliOption.NO_TERMINAL_RULE_CHECK.shortSwitch)); cliParser.hasOption(CliOption.NO_TERMINAL_RULE_CHECK.shortSwitch));
conversionOptions.setEnableAsyncScheduler(
cliParser.hasOption(CliOption.ENABLE_ASYNC_SCHEDULER.shortSwitch));
checkOptionPresent(cliParser, CliOption.YARN_SITE); checkOptionPresent(cliParser, CliOption.YARN_SITE);
checkOutputDefined(cliParser, dryRun); checkOutputDefined(cliParser, dryRun);

View File

@ -270,7 +270,8 @@ public class FSConfigToCSConfigConverter {
FSYarnSiteConverter siteConverter = FSYarnSiteConverter siteConverter =
new FSYarnSiteConverter(); new FSYarnSiteConverter();
siteConverter.convertSiteProperties(inputYarnSiteConfig, siteConverter.convertSiteProperties(inputYarnSiteConfig,
convertedYarnSiteConfig, drfUsed); convertedYarnSiteConfig, drfUsed,
conversionOptions.isEnableAsyncScheduler());
// See docs: "allow-undeclared-pools" and "user-as-default-queue" are // See docs: "allow-undeclared-pools" and "user-as-default-queue" are
// ignored if we have placement rules // ignored if we have placement rules

View File

@ -38,7 +38,7 @@ public class FSYarnSiteConverter {
@SuppressWarnings({"deprecation", "checkstyle:linelength"}) @SuppressWarnings({"deprecation", "checkstyle:linelength"})
public void convertSiteProperties(Configuration conf, public void convertSiteProperties(Configuration conf,
Configuration yarnSiteConfig, boolean drfUsed) { Configuration yarnSiteConfig, boolean drfUsed, boolean enableAsyncScheduler) {
yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER, yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getCanonicalName()); CapacityScheduler.class.getCanonicalName());
@ -131,6 +131,10 @@ public class FSYarnSiteConverter {
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getCanonicalName()); DominantResourceCalculator.class.getCanonicalName());
} }
if (enableAsyncScheduler) {
yarnSiteConfig.setBoolean(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
}
} }
public boolean isPreemptionEnabled() { public boolean isPreemptionEnabled() {

View File

@ -651,4 +651,35 @@ public class TestFSConfigToCSConfigArgumentHandler {
verifyZeroInteractions(mockValidator); verifyZeroInteractions(mockValidator);
} }
@Test
public void testEnabledAsyncScheduling() throws Exception {
setupFSConfigConversionFiles(true);
FSConfigToCSConfigArgumentHandler argumentHandler =
new FSConfigToCSConfigArgumentHandler(conversionOptions, mockValidator);
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE, "-p",
"-a");
argumentHandler.parseAndConvert(args);
assertTrue("-a switch had no effect",
conversionOptions.isEnableAsyncScheduler());
}
@Test
public void testDisabledAsyncScheduling() throws Exception {
setupFSConfigConversionFiles(true);
FSConfigToCSConfigArgumentHandler argumentHandler =
new FSConfigToCSConfigArgumentHandler(conversionOptions, mockValidator);
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE, "-p");
argumentHandler.parseAndConvert(args);
assertFalse("-a switch wasn't provided but async scheduling option is true",
conversionOptions.isEnableAsyncScheduler());
}
} }

View File

@ -717,6 +717,41 @@ public class TestFSConfigToCSConfigConverter {
any(Boolean.class)); any(Boolean.class));
} }
@Test
public void testConversionWhenAsyncSchedulingIsEnabled()
throws Exception {
boolean schedulingEnabledValue = testConversionWithAsyncSchedulingOption(true);
assertTrue("Asynchronous scheduling should be true", schedulingEnabledValue);
}
@Test
public void testConversionWhenAsyncSchedulingIsDisabled() throws Exception {
boolean schedulingEnabledValue = testConversionWithAsyncSchedulingOption(false);
assertEquals("Asynchronous scheduling should be the default value",
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE,
schedulingEnabledValue);
}
private boolean testConversionWithAsyncSchedulingOption(boolean enabled) throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource(CLUSTER_RESOURCE_STRING)
.withFairSchedulerXmlConfig(FAIR_SCHEDULER_XML)
.build();
ConversionOptions conversionOptions = createDefaultConversionOptions();
conversionOptions.setEnableAsyncScheduler(enabled);
converter = new FSConfigToCSConfigConverter(ruleHandler,
conversionOptions);
converter.convert(params);
Configuration convertedConfig = converter.getYarnSiteConfig();
return convertedConfig.getBoolean(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE);
}
private Configuration getConvertedCSConfig(String dir) throws IOException { private Configuration getConvertedCSConfig(String dir) throws IOException {
File capacityFile = new File(dir, "capacity-scheduler.xml"); File capacityFile = new File(dir, "capacity-scheduler.xml");
ByteArrayInputStream input = ByteArrayInputStream input =

View File

@ -26,6 +26,7 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
/** /**
@ -52,7 +53,8 @@ public class TestFSYarnSiteConverter {
yarnConfig.setInt( yarnConfig.setInt(
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, 666); FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, 666);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertTrue("Cont. scheduling", yarnConvertedConfig.getBoolean( assertTrue("Cont. scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false)); CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false));
@ -70,7 +72,8 @@ public class TestFSYarnSiteConverter {
FairSchedulerConfiguration.WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS, FairSchedulerConfiguration.WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
321); 321);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertTrue("Preemption enabled", assertTrue("Preemption enabled",
yarnConvertedConfig.getBoolean( yarnConvertedConfig.getBoolean(
@ -90,7 +93,8 @@ public class TestFSYarnSiteConverter {
public void testSiteAssignMultipleConversion() { public void testSiteAssignMultipleConversion() {
yarnConfig.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); yarnConfig.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertTrue("Assign multiple", assertTrue("Assign multiple",
yarnConvertedConfig.getBoolean( yarnConvertedConfig.getBoolean(
@ -102,7 +106,8 @@ public class TestFSYarnSiteConverter {
public void testSiteMaxAssignConversion() { public void testSiteMaxAssignConversion() {
yarnConfig.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 111); yarnConfig.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 111);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertEquals("Max assign", 111, assertEquals("Max assign", 111,
yarnConvertedConfig.getInt( yarnConvertedConfig.getInt(
@ -116,7 +121,8 @@ public class TestFSYarnSiteConverter {
yarnConfig.set(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, yarnConfig.set(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK,
"321.321"); "321.321");
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertEquals("Locality threshold node", "123.123", assertEquals("Locality threshold node", "123.123",
yarnConvertedConfig.get( yarnConvertedConfig.get(
@ -128,7 +134,8 @@ public class TestFSYarnSiteConverter {
@Test @Test
public void testSiteDrfEnabledConversion() { public void testSiteDrfEnabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true,
false);
assertEquals("Resource calculator type", DominantResourceCalculator.class, assertEquals("Resource calculator type", DominantResourceCalculator.class,
yarnConvertedConfig.getClass( yarnConvertedConfig.getClass(
@ -137,11 +144,32 @@ public class TestFSYarnSiteConverter {
@Test @Test
public void testSiteDrfDisabledConversion() { public void testSiteDrfDisabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertEquals("Resource calculator type", DefaultResourceCalculator.class, assertEquals("Resource calculator type", DefaultResourceCalculator.class,
yarnConvertedConfig.getClass( yarnConvertedConfig.getClass(
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
CapacitySchedulerConfiguration.DEFAULT_RESOURCE_CALCULATOR_CLASS)); CapacitySchedulerConfiguration.DEFAULT_RESOURCE_CALCULATOR_CLASS));
} }
@Test
public void testAsyncSchedulingEnabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true,
true);
assertTrue("Asynchronous scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE));
}
@Test
public void testAsyncSchedulingDisabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertFalse("Asynchronous scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE));
}
} }