diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/ConversionOptions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/ConversionOptions.java index 7fec0a80ad8..aae1d5547a8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/ConversionOptions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/ConversionOptions.java @@ -22,6 +22,7 @@ public class ConversionOptions { private DryRunResultHolder dryRunResultHolder; private boolean dryRun; private boolean noTerminalRuleCheck; + private boolean enableAsyncScheduler; public ConversionOptions(DryRunResultHolder dryRunResultHolder, boolean dryRun) { @@ -41,6 +42,14 @@ public boolean isNoRuleTerminalCheck() { return noTerminalRuleCheck; } + public void setEnableAsyncScheduler(boolean enableAsyncScheduler) { + this.enableAsyncScheduler = enableAsyncScheduler; + } + + public boolean isEnableAsyncScheduler() { + return enableAsyncScheduler; + } + public void handleWarning(String msg, Logger log) { if (dryRun) { dryRunResultHolder.addDryRunWarning(msg); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigArgumentHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigArgumentHandler.java index 5bd3b1a52ba..c2554a440b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigArgumentHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigArgumentHandler.java @@ -109,6 +109,9 @@ public enum CliOption { SKIP_VERIFICATION("skip verification", "s", "skip-verification", "Skips the verification of the converted configuration", false), + ENABLE_ASYNC_SCHEDULER("enable asynchronous scheduler", "a", "enable-async-scheduler", + "Enables the Asynchronous scheduler which decouples the CapacityScheduler" + + " scheduling from Node Heartbeats.", false), HELP("help", "h", "help", "Displays the list of options", false); private final String name; @@ -220,6 +223,8 @@ private FSConfigToCSConfigConverter prepareAndGetConverter( conversionOptions.setDryRun(dryRun); conversionOptions.setNoTerminalRuleCheck( cliParser.hasOption(CliOption.NO_TERMINAL_RULE_CHECK.shortSwitch)); + conversionOptions.setEnableAsyncScheduler( + cliParser.hasOption(CliOption.ENABLE_ASYNC_SCHEDULER.shortSwitch)); checkOptionPresent(cliParser, CliOption.YARN_SITE); checkOutputDefined(cliParser, dryRun); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverter.java index 368bc1f4937..5acf3567252 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSConfigToCSConfigConverter.java @@ -270,7 +270,8 @@ private void convertYarnSiteXml(Configuration inputYarnSiteConfig, FSYarnSiteConverter siteConverter = new FSYarnSiteConverter(); siteConverter.convertSiteProperties(inputYarnSiteConfig, - convertedYarnSiteConfig, drfUsed); + convertedYarnSiteConfig, drfUsed, + conversionOptions.isEnableAsyncScheduler()); // See docs: "allow-undeclared-pools" and "user-as-default-queue" are // ignored if we have placement rules diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSYarnSiteConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSYarnSiteConverter.java index 8e5f92aa6d4..86e4cd3dfef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSYarnSiteConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/FSYarnSiteConverter.java @@ -38,7 +38,7 @@ public class FSYarnSiteConverter { @SuppressWarnings({"deprecation", "checkstyle:linelength"}) public void convertSiteProperties(Configuration conf, - Configuration yarnSiteConfig, boolean drfUsed) { + Configuration yarnSiteConfig, boolean drfUsed, boolean enableAsyncScheduler) { yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class.getCanonicalName()); @@ -131,6 +131,10 @@ public void convertSiteProperties(Configuration conf, CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, DominantResourceCalculator.class.getCanonicalName()); } + + if (enableAsyncScheduler) { + yarnSiteConfig.setBoolean(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true); + } } public boolean isPreemptionEnabled() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigArgumentHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigArgumentHandler.java index 9f41d314794..c3f380e2ea8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigArgumentHandler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigArgumentHandler.java @@ -651,4 +651,35 @@ public void testValidationSkippedWhenOutputIsConsole() throws Exception { verifyZeroInteractions(mockValidator); } + + @Test + public void testEnabledAsyncScheduling() throws Exception { + setupFSConfigConversionFiles(true); + + FSConfigToCSConfigArgumentHandler argumentHandler = + new FSConfigToCSConfigArgumentHandler(conversionOptions, mockValidator); + + String[] args = getArgumentsAsArrayWithDefaults("-f", + FSConfigConverterTestCommons.FS_ALLOC_FILE, "-p", + "-a"); + argumentHandler.parseAndConvert(args); + + assertTrue("-a switch had no effect", + conversionOptions.isEnableAsyncScheduler()); + } + + @Test + public void testDisabledAsyncScheduling() throws Exception { + setupFSConfigConversionFiles(true); + + FSConfigToCSConfigArgumentHandler argumentHandler = + new FSConfigToCSConfigArgumentHandler(conversionOptions, mockValidator); + + String[] args = getArgumentsAsArrayWithDefaults("-f", + FSConfigConverterTestCommons.FS_ALLOC_FILE, "-p"); + argumentHandler.parseAndConvert(args); + + assertFalse("-a switch wasn't provided but async scheduling option is true", + conversionOptions.isEnableAsyncScheduler()); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java index dd3e6d45888..46e1fb39ad6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSConfigToCSConfigConverter.java @@ -717,6 +717,41 @@ public void testPlacementRulesConversionEnabled() throws Exception { any(Boolean.class)); } + @Test + public void testConversionWhenAsyncSchedulingIsEnabled() + throws Exception { + boolean schedulingEnabledValue = testConversionWithAsyncSchedulingOption(true); + assertTrue("Asynchronous scheduling should be true", schedulingEnabledValue); + } + + @Test + public void testConversionWhenAsyncSchedulingIsDisabled() throws Exception { + boolean schedulingEnabledValue = testConversionWithAsyncSchedulingOption(false); + assertEquals("Asynchronous scheduling should be the default value", + CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE, + schedulingEnabledValue); + } + + private boolean testConversionWithAsyncSchedulingOption(boolean enabled) throws Exception { + FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder() + .withClusterResource(CLUSTER_RESOURCE_STRING) + .withFairSchedulerXmlConfig(FAIR_SCHEDULER_XML) + .build(); + + ConversionOptions conversionOptions = createDefaultConversionOptions(); + conversionOptions.setEnableAsyncScheduler(enabled); + + converter = new FSConfigToCSConfigConverter(ruleHandler, + conversionOptions); + + converter.convert(params); + + Configuration convertedConfig = converter.getYarnSiteConfig(); + + return convertedConfig.getBoolean(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, + CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE); + } + private Configuration getConvertedCSConfig(String dir) throws IOException { File capacityFile = new File(dir, "capacity-scheduler.xml"); ByteArrayInputStream input = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSYarnSiteConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSYarnSiteConverter.java index 1597d5054a9..9cebf16f8be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSYarnSiteConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/converter/TestFSYarnSiteConverter.java @@ -26,6 +26,7 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; /** @@ -52,7 +53,8 @@ public void testSiteContinuousSchedulingConversion() { yarnConfig.setInt( FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, 666); - converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); + converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false, + false); assertTrue("Cont. scheduling", yarnConvertedConfig.getBoolean( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false)); @@ -70,7 +72,8 @@ public void testSitePreemptionConversion() { FairSchedulerConfiguration.WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS, 321); - converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); + converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false, + false); assertTrue("Preemption enabled", yarnConvertedConfig.getBoolean( @@ -90,7 +93,8 @@ public void testSitePreemptionConversion() { public void testSiteAssignMultipleConversion() { yarnConfig.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); - converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); + converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false, + false); assertTrue("Assign multiple", yarnConvertedConfig.getBoolean( @@ -102,7 +106,8 @@ public void testSiteAssignMultipleConversion() { public void testSiteMaxAssignConversion() { yarnConfig.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 111); - converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); + converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false, + false); assertEquals("Max assign", 111, yarnConvertedConfig.getInt( @@ -116,7 +121,8 @@ public void testSiteLocalityThresholdConversion() { yarnConfig.set(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, "321.321"); - converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); + converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false, + false); assertEquals("Locality threshold node", "123.123", yarnConvertedConfig.get( @@ -128,7 +134,8 @@ public void testSiteLocalityThresholdConversion() { @Test public void testSiteDrfEnabledConversion() { - converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true); + converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true, + false); assertEquals("Resource calculator type", DominantResourceCalculator.class, yarnConvertedConfig.getClass( @@ -137,11 +144,32 @@ public void testSiteDrfEnabledConversion() { @Test public void testSiteDrfDisabledConversion() { - converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false); + converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false, + false); assertEquals("Resource calculator type", DefaultResourceCalculator.class, yarnConvertedConfig.getClass( CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, CapacitySchedulerConfiguration.DEFAULT_RESOURCE_CALCULATOR_CLASS)); } + + @Test + public void testAsyncSchedulingEnabledConversion() { + converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true, + true); + + assertTrue("Asynchronous scheduling", yarnConvertedConfig.getBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, + CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE)); + } + + @Test + public void testAsyncSchedulingDisabledConversion() { + converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false, + false); + + assertFalse("Asynchronous scheduling", yarnConvertedConfig.getBoolean( + CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, + CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE)); + } }