YARN-10085. FS-CS converter: remove mixed ordering policy check. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2020-01-28 15:22:12 +01:00
parent f876dc228b
commit ca29768035
12 changed files with 277 additions and 68 deletions

View File

@ -23,6 +23,7 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -39,9 +40,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationCo
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfigurationException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfigurationException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -73,6 +75,7 @@ public class FSConfigToCSConfigConverter {
private boolean sizeBasedWeight = false; private boolean sizeBasedWeight = false;
private boolean userAsDefaultQueue = false; private boolean userAsDefaultQueue = false;
private ConversionOptions conversionOptions; private ConversionOptions conversionOptions;
private boolean drfUsed = false;
private Configuration yarnSiteConfig; private Configuration yarnSiteConfig;
private Configuration capacitySchedulerConfig; private Configuration capacitySchedulerConfig;
@ -208,6 +211,8 @@ public class FSConfigToCSConfigConverter {
fs.setRMContext(ctx); fs.setRMContext(ctx);
fs.init(conf); fs.init(conf);
drfUsed = isDrfUsed(fs);
AllocationConfiguration allocConf = fs.getAllocationConfiguration(); AllocationConfiguration allocConf = fs.getAllocationConfiguration();
queueMaxAppsDefault = allocConf.getQueueMaxAppsDefault(); queueMaxAppsDefault = allocConf.getQueueMaxAppsDefault();
queueMaxAMShareDefault = allocConf.getQueueMaxAMShareDefault(); queueMaxAMShareDefault = allocConf.getQueueMaxAMShareDefault();
@ -246,7 +251,7 @@ public class FSConfigToCSConfigConverter {
private void convertYarnSiteXml(Configuration conf) { private void convertYarnSiteXml(Configuration conf) {
FSYarnSiteConverter siteConverter = FSYarnSiteConverter siteConverter =
new FSYarnSiteConverter(); new FSYarnSiteConverter();
siteConverter.convertSiteProperties(conf, yarnSiteConfig); siteConverter.convertSiteProperties(conf, yarnSiteConfig, drfUsed);
autoCreateChildQueues = siteConverter.isAutoCreateChildQueues(); autoCreateChildQueues = siteConverter.isAutoCreateChildQueues();
preemptionEnabled = siteConverter.isPreemptionEnabled(); preemptionEnabled = siteConverter.isPreemptionEnabled();
@ -271,6 +276,7 @@ public class FSConfigToCSConfigConverter {
.withQueueMaxAMShareDefault(queueMaxAMShareDefault) .withQueueMaxAMShareDefault(queueMaxAMShareDefault)
.withQueueMaxAppsDefault(queueMaxAppsDefault) .withQueueMaxAppsDefault(queueMaxAppsDefault)
.withConversionOptions(conversionOptions) .withConversionOptions(conversionOptions)
.withDrfUsed(drfUsed)
.build(); .build();
queueConverter.convertQueueHierarchy(rootQueue); queueConverter.convertQueueHierarchy(rootQueue);
@ -287,18 +293,6 @@ public class FSConfigToCSConfigConverter {
ruleHandler, userAsDefaultQueue); ruleHandler, userAsDefaultQueue);
properties.forEach((k, v) -> capacitySchedulerConfig.set(k, v)); properties.forEach((k, v) -> capacitySchedulerConfig.set(k, v));
} }
// Validate ordering policy
if (queueConverter.isDrfPolicyUsedOnQueueLevel()) {
if (queueConverter.isFifoOrFairSharePolicyUsed()) {
throw new ConversionException(
"DRF ordering policy cannot be used together with fifo/fair");
} else {
capacitySchedulerConfig.set(
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getCanonicalName());
}
}
} }
private void emitDefaultMaxApplications() { private void emitDefaultMaxApplications() {
@ -359,6 +353,38 @@ public class FSConfigToCSConfigConverter {
} }
} }
private boolean isDrfUsed(FairScheduler fs) {
FSQueue rootQueue = fs.getQueueManager().getRootQueue();
AllocationConfiguration allocConf = fs.getAllocationConfiguration();
String defaultPolicy = allocConf.getDefaultSchedulingPolicy().getName();
if (DominantResourceFairnessPolicy.NAME.equals(defaultPolicy)) {
return true;
} else {
return isDrfUsedOnQueueLevel(rootQueue);
}
}
private boolean isDrfUsedOnQueueLevel(FSQueue queue) {
String policy = queue.getPolicy().getName();
boolean usesDrf = DominantResourceFairnessPolicy.NAME.equals(policy);
if (usesDrf) {
return true;
} else {
List<FSQueue> children = queue.getChildQueues();
if (children != null) {
for (FSQueue child : children) {
usesDrf |= isDrfUsedOnQueueLevel(child);
}
}
return usesDrf;
}
}
@VisibleForTesting @VisibleForTesting
Resource getClusterResource() { Resource getClusterResource() {
return clusterResource; return clusterResource;
@ -373,4 +399,9 @@ public class FSConfigToCSConfigConverter {
FSConfigToCSConfigRuleHandler getRuleHandler() { FSConfigToCSConfigRuleHandler getRuleHandler() {
return ruleHandler; return ruleHandler;
} }
@VisibleForTesting
Configuration getYarnSiteConfig() {
return yarnSiteConfig;
}
} }

View File

@ -73,6 +73,9 @@ public class FSConfigToCSConfigRuleHandler {
public static final String QUEUE_AUTO_CREATE = public static final String QUEUE_AUTO_CREATE =
"queueAutoCreate.action"; "queueAutoCreate.action";
public static final String FAIR_AS_DRF =
"fairAsDrf.action";
@VisibleForTesting @VisibleForTesting
enum RuleAction { enum RuleAction {
WARNING, WARNING,
@ -119,6 +122,7 @@ public class FSConfigToCSConfigRuleHandler {
setActionForProperty(SPECIFIED_NOT_FIRST); setActionForProperty(SPECIFIED_NOT_FIRST);
setActionForProperty(RESERVATION_SYSTEM); setActionForProperty(RESERVATION_SYSTEM);
setActionForProperty(QUEUE_AUTO_CREATE); setActionForProperty(QUEUE_AUTO_CREATE);
setActionForProperty(FAIR_AS_DRF);
} }
public void handleMaxCapacityPercentage(String queueName) { public void handleMaxCapacityPercentage(String queueName) {
@ -181,6 +185,14 @@ public class FSConfigToCSConfigRuleHandler {
placementRule)); placementRule));
} }
public void handleFairAsDrf(String queueName) {
handle(FAIR_AS_DRF,
null,
format(
"Queue %s will use DRF policy instead of Fair",
queueName));
}
private void handle(String actionName, String fsSetting, String message) { private void handle(String actionName, String fsSetting, String message) {
RuleAction action = actions.get(actionName); RuleAction action = actions.get(actionName);

View File

@ -55,9 +55,7 @@ public class FSQueueConverter {
private final float queueMaxAMShareDefault; private final float queueMaxAMShareDefault;
private final boolean autoCreateChildQueues; private final boolean autoCreateChildQueues;
private final int queueMaxAppsDefault; private final int queueMaxAppsDefault;
private final boolean drfUsed;
private boolean fifoOrFairSharePolicyUsed;
private boolean drfPolicyUsedOnQueueLevel;
private ConversionOptions conversionOptions; private ConversionOptions conversionOptions;
@ -72,6 +70,7 @@ public class FSQueueConverter {
this.autoCreateChildQueues = builder.autoCreateChildQueues; this.autoCreateChildQueues = builder.autoCreateChildQueues;
this.queueMaxAppsDefault = builder.queueMaxAppsDefault; this.queueMaxAppsDefault = builder.queueMaxAppsDefault;
this.conversionOptions = builder.conversionOptions; this.conversionOptions = builder.conversionOptions;
this.drfUsed = builder.drfUsed;
} }
public void convertQueueHierarchy(FSQueue queue) { public void convertQueueHierarchy(FSQueue queue) {
@ -105,14 +104,6 @@ public class FSQueueConverter {
} }
} }
public boolean isFifoOrFairSharePolicyUsed() {
return fifoOrFairSharePolicyUsed;
}
public boolean isDrfPolicyUsedOnQueueLevel() {
return drfPolicyUsedOnQueueLevel;
}
/** /**
* Generates yarn.scheduler.capacity.&lt;queue-name&gt;.queues. * Generates yarn.scheduler.capacity.&lt;queue-name&gt;.queues.
* @param queueName * @param queueName
@ -306,20 +297,20 @@ public class FSQueueConverter {
String policy = queue.getPolicy().getName(); String policy = queue.getPolicy().getName();
switch (policy) { switch (policy) {
case DominantResourceFairnessPolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FairSharePolicy.NAME);
break;
case FairSharePolicy.NAME: case FairSharePolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FairSharePolicy.NAME); + ".ordering-policy", FairSharePolicy.NAME);
fifoOrFairSharePolicyUsed = true; if (drfUsed) {
ruleHandler.handleFairAsDrf(queueName);
}
break; break;
case FifoPolicy.NAME: case FifoPolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FifoPolicy.NAME); + ".ordering-policy", FifoPolicy.NAME);
fifoOrFairSharePolicyUsed = true;
break;
case DominantResourceFairnessPolicy.NAME:
// DRF is not supported on a queue level,
// it has to be global
drfPolicyUsedOnQueueLevel = true;
break; break;
default: default:
String msg = String.format("Unexpected ordering policy " + String msg = String.format("Unexpected ordering policy " +

View File

@ -32,6 +32,7 @@ public final class FSQueueConverterBuilder {
float queueMaxAMShareDefault; float queueMaxAMShareDefault;
int queueMaxAppsDefault; int queueMaxAppsDefault;
ConversionOptions conversionOptions; ConversionOptions conversionOptions;
boolean drfUsed;
private FSQueueConverterBuilder() { private FSQueueConverterBuilder() {
} }
@ -94,6 +95,11 @@ public final class FSQueueConverterBuilder {
return this; return this;
} }
public FSQueueConverterBuilder withDrfUsed(boolean drfUsed) {
this.drfUsed = drfUsed;
return this;
}
public FSQueueConverter build() { public FSQueueConverter build() {
return new FSQueueConverter(this); return new FSQueueConverter(this);
} }

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
/** /**
* Converts a Fair Scheduler site configuration to Capacity Scheduler * Converts a Fair Scheduler site configuration to Capacity Scheduler
@ -37,7 +38,7 @@ public class FSYarnSiteConverter {
@SuppressWarnings({"deprecation", "checkstyle:linelength"}) @SuppressWarnings({"deprecation", "checkstyle:linelength"})
public void convertSiteProperties(Configuration conf, public void convertSiteProperties(Configuration conf,
Configuration yarnSiteConfig) { Configuration yarnSiteConfig, boolean drfUsed) {
yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER, yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getCanonicalName()); CapacityScheduler.class.getCanonicalName());
@ -139,6 +140,12 @@ public class FSYarnSiteConverter {
FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE)) { FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE)) {
userAsDefaultQueue = true; userAsDefaultQueue = true;
} }
if (drfUsed) {
yarnSiteConfig.set(
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getCanonicalName());
}
} }
public boolean isPreemptionEnabled() { public boolean isPreemptionEnabled() {

View File

@ -82,7 +82,6 @@ class QueuePlacementConverter {
} }
mapping.append("u:" + USER + ":").append(defaultRule.defaultQueueName); mapping.append("u:" + USER + ":").append(defaultRule.defaultQueueName);
} else if (rule instanceof SecondaryGroupExistingPlacementRule) { } else if (rule instanceof SecondaryGroupExistingPlacementRule) {
// TODO: wait for YARN-9840
if (mapping.length() > 0) { if (mapping.length() > 0) {
mapping.append(";"); mapping.append(";");
} }
@ -107,10 +106,8 @@ class QueuePlacementConverter {
mapping.append(";"); mapping.append(";");
} }
if (pr instanceof PrimaryGroupPlacementRule) { if (pr instanceof PrimaryGroupPlacementRule) {
// TODO: wait for YARN-9841
mapping.append("u:" + USER + ":" + PRIMARY_GROUP + "." + USER); mapping.append("u:" + USER + ":" + PRIMARY_GROUP + "." + USER);
} else if (pr instanceof SecondaryGroupExistingPlacementRule) { } else if (pr instanceof SecondaryGroupExistingPlacementRule) {
// TODO: wait for YARN-9865
mapping.append("u:" + USER + ":" + SECONDARY_GROUP + "." + USER); mapping.append("u:" + USER + ":" + SECONDARY_GROUP + "." + USER);
} else if (pr instanceof DefaultPlacementRule) { } else if (pr instanceof DefaultPlacementRule) {
DefaultPlacementRule defaultRule = (DefaultPlacementRule) pr; DefaultPlacementRule defaultRule = (DefaultPlacementRule) pr;

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -60,6 +61,8 @@ import org.mockito.junit.MockitoJUnitRunner;
*/ */
@RunWith(MockitoJUnitRunner.class) @RunWith(MockitoJUnitRunner.class)
public class TestFSConfigToCSConfigConverter { public class TestFSConfigToCSConfigConverter {
private static final String CLUSTER_RESOURCE_STRING =
"vcores=20, memory-mb=240";
private static final Resource CLUSTER_RESOURCE = private static final Resource CLUSTER_RESOURCE =
Resource.newInstance(16384, 16); Resource.newInstance(16384, 16);
private static final String FILE_PREFIX = "file:"; private static final String FILE_PREFIX = "file:";
@ -67,6 +70,10 @@ public class TestFSConfigToCSConfigConverter {
prepareFileName("fair-scheduler-conversion.xml"); prepareFileName("fair-scheduler-conversion.xml");
private static final String FS_INVALID_PLACEMENT_RULES_XML = private static final String FS_INVALID_PLACEMENT_RULES_XML =
prepareFileName("fair-scheduler-invalidplacementrules.xml"); prepareFileName("fair-scheduler-invalidplacementrules.xml");
private static final String FS_ONLY_FAIR_POLICY_XML =
prepareFileName("fair-scheduler-onlyfairpolicy.xml");
private static final String FS_MIXED_POLICY_XML =
prepareFileName("fair-scheduler-orderingpolicy-mixed.xml");
@Mock @Mock
private FSConfigToCSConfigRuleHandler ruleHandler; private FSConfigToCSConfigRuleHandler ruleHandler;
@ -215,20 +222,6 @@ public class TestFSConfigToCSConfigConverter {
conf.getInt(PREFIX + "maximum-applications", -1)); conf.getInt(PREFIX + "maximum-applications", -1));
} }
@Test
public void testMixedQueueOrderingPolicy() throws Exception {
expectedException.expect(ConversionException.class);
expectedException.expectMessage(
"DRF ordering policy cannot be used together with fifo/fair");
String absolutePath =
new File("src/test/resources/fair-scheduler-orderingpolicy-mixed.xml")
.getAbsolutePath();
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
converter.convert(config);
}
@Test @Test
public void testQueueMaxChildCapacityNotSupported() throws Exception { public void testQueueMaxChildCapacityNotSupported() throws Exception {
expectedException.expect(UnsupportedPropertyException.class); expectedException.expect(UnsupportedPropertyException.class);
@ -277,7 +270,7 @@ public class TestFSConfigToCSConfigConverter {
@Test @Test
public void testConvertFSConfigurationClusterResource() throws Exception { public void testConvertFSConfigurationClusterResource() throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder() FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memory-mb=240") .withClusterResource(CLUSTER_RESOURCE_STRING)
.build(); .build();
converter.convert(params); converter.convert(params);
assertEquals("Resource", Resource.newInstance(240, 20), assertEquals("Resource", Resource.newInstance(240, 20),
@ -288,7 +281,7 @@ public class TestFSConfigToCSConfigConverter {
public void testConvertFSConfigPctModeUsedAndClusterResourceDefined() public void testConvertFSConfigPctModeUsedAndClusterResourceDefined()
throws Exception { throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder() FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memory-mb=240") .withClusterResource(CLUSTER_RESOURCE_STRING)
.build(); .build();
converter.convert(params); converter.convert(params);
assertEquals("Resource", Resource.newInstance(240, 20), assertEquals("Resource", Resource.newInstance(240, 20),
@ -394,7 +387,7 @@ public class TestFSConfigToCSConfigConverter {
@Test @Test
public void testConvertCheckOutputDir() throws Exception { public void testConvertCheckOutputDir() throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder() FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memory-mb=240") .withClusterResource(CLUSTER_RESOURCE_STRING)
.build(); .build();
converter.convert(params); converter.convert(params);
@ -419,7 +412,7 @@ public class TestFSConfigToCSConfigConverter {
throws Exception { throws Exception {
FSConfigToCSConfigConverterParams params = FSConfigToCSConfigConverterParams params =
createParamsBuilder(YARN_SITE_XML_NO_REF_TO_FS_XML) createParamsBuilder(YARN_SITE_XML_NO_REF_TO_FS_XML)
.withClusterResource("vcores=20, memory-mb=240") .withClusterResource(CLUSTER_RESOURCE_STRING)
.build(); .build();
expectedException.expect(PreconditionException.class); expectedException.expect(PreconditionException.class);
@ -430,7 +423,7 @@ public class TestFSConfigToCSConfigConverter {
@Test @Test
public void testInvalidFairSchedulerXml() throws Exception { public void testInvalidFairSchedulerXml() throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder() FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memory-mb=240") .withClusterResource(CLUSTER_RESOURCE_STRING)
.withFairSchedulerXmlConfig(FAIR_SCHEDULER_XML_INVALID) .withFairSchedulerXmlConfig(FAIR_SCHEDULER_XML_INVALID)
.build(); .build();
@ -442,7 +435,7 @@ public class TestFSConfigToCSConfigConverter {
public void testInvalidYarnSiteXml() throws Exception { public void testInvalidYarnSiteXml() throws Exception {
FSConfigToCSConfigConverterParams params = FSConfigToCSConfigConverterParams params =
createParamsBuilder(YARN_SITE_XML_INVALID) createParamsBuilder(YARN_SITE_XML_INVALID)
.withClusterResource("vcores=20, memory-mb=240") .withClusterResource(CLUSTER_RESOURCE_STRING)
.build(); .build();
expectedException.expect(RuntimeException.class); expectedException.expect(RuntimeException.class);
@ -464,7 +457,7 @@ public class TestFSConfigToCSConfigConverter {
public void testConversionWhenInvalidPlacementRulesIgnored() public void testConversionWhenInvalidPlacementRulesIgnored()
throws Exception { throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder() FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memory-mb=240") .withClusterResource(CLUSTER_RESOURCE_STRING)
.withFairSchedulerXmlConfig(FS_INVALID_PLACEMENT_RULES_XML) .withFairSchedulerXmlConfig(FS_INVALID_PLACEMENT_RULES_XML)
.build(); .build();
@ -479,6 +472,38 @@ public class TestFSConfigToCSConfigConverter {
// expected: no exception // expected: no exception
} }
@Test
public void testConversionWhenOnlyFairPolicyIsUsed() throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource(CLUSTER_RESOURCE_STRING)
.withFairSchedulerXmlConfig(FS_ONLY_FAIR_POLICY_XML)
.build();
converter.convert(params);
Configuration convertedConfig = converter.getYarnSiteConfig();
assertEquals("Resource calculator class shouldn't be set", null,
convertedConfig.getClass(
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, null));
}
@Test
public void testConversionWhenMixedPolicyIsUsed() throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource(CLUSTER_RESOURCE_STRING)
.withFairSchedulerXmlConfig(FS_MIXED_POLICY_XML)
.build();
converter.convert(params);
Configuration convertedConfig = converter.getYarnSiteConfig();
assertEquals("Resource calculator type", DominantResourceCalculator.class,
convertedConfig.getClass(
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, null));
}
private Configuration getConvertedCSConfig() { private Configuration getConvertedCSConfig() {
ByteArrayInputStream input = ByteArrayInputStream input =
new ByteArrayInputStream(csConfigOut.toByteArray()); new ByteArrayInputStream(csConfigOut.toByteArray());

View File

@ -25,6 +25,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.conve
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.SPECIFIED_NOT_FIRST; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.SPECIFIED_NOT_FIRST;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_APPS_DEFAULT; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_APPS_DEFAULT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_RUNNING_APPS; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_RUNNING_APPS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.FAIR_AS_DRF;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -86,6 +87,7 @@ public class TestFSConfigToCSConfigRuleHandler {
rules.put(SPECIFIED_NOT_FIRST, WARNING); rules.put(SPECIFIED_NOT_FIRST, WARNING);
rules.put(USER_MAX_APPS_DEFAULT, WARNING); rules.put(USER_MAX_APPS_DEFAULT, WARNING);
rules.put(USER_MAX_RUNNING_APPS, WARNING); rules.put(USER_MAX_RUNNING_APPS, WARNING);
rules.put(FAIR_AS_DRF, WARNING);
ruleHandler = new FSConfigToCSConfigRuleHandler(rules, ruleHandler = new FSConfigToCSConfigRuleHandler(rules,
createDefaultConversionOptions()); createDefaultConversionOptions());
@ -111,6 +113,8 @@ public class TestFSConfigToCSConfigRuleHandler {
rules.put(SPECIFIED_NOT_FIRST, ABORT); rules.put(SPECIFIED_NOT_FIRST, ABORT);
rules.put(USER_MAX_APPS_DEFAULT, ABORT); rules.put(USER_MAX_APPS_DEFAULT, ABORT);
rules.put(USER_MAX_RUNNING_APPS, ABORT); rules.put(USER_MAX_RUNNING_APPS, ABORT);
rules.put(USER_MAX_RUNNING_APPS, ABORT);
rules.put(FAIR_AS_DRF, ABORT);
rules.put(MAX_CHILD_QUEUE_LIMIT, "1"); rules.put(MAX_CHILD_QUEUE_LIMIT, "1");
ruleHandler = new FSConfigToCSConfigRuleHandler(rules, ruleHandler = new FSConfigToCSConfigRuleHandler(rules,
@ -126,6 +130,7 @@ public class TestFSConfigToCSConfigRuleHandler {
expectAbort(() -> ruleHandler.handleSpecifiedNotFirstRule()); expectAbort(() -> ruleHandler.handleSpecifiedNotFirstRule());
expectAbort(() -> ruleHandler.handleUserMaxApps()); expectAbort(() -> ruleHandler.handleUserMaxApps());
expectAbort(() -> ruleHandler.handleUserMaxAppsDefault()); expectAbort(() -> ruleHandler.handleUserMaxAppsDefault());
expectAbort(() -> ruleHandler.handleFairAsDrf("test"));
} }
@Test(expected = ConversionException.class) @Test(expected = ConversionException.class)

View File

@ -20,6 +20,9 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -400,6 +403,23 @@ public class TestFSQueueConverter {
csConfig.get(PREFIX + "root.admins.bob.ordering-policy")); csConfig.get(PREFIX + "root.admins.bob.ordering-policy"));
} }
@Test
public void testQueueUnsupportedMixedOrderingPolicy() throws IOException {
converter = builder.withDrfUsed(true).build();
String absolutePath =
new File("src/test/resources/fair-scheduler-orderingpolicy-mixed.xml")
.getAbsolutePath();
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
rootQueue = fs.getQueueManager().getRootQueue();
converter.convertQueueHierarchy(rootQueue);
verify(ruleHandler, times(6)).handleFairAsDrf(anyString());
}
@Test @Test
public void testQueueMaxChildCapacityNotSupported() { public void testQueueMaxChildCapacityNotSupported() {
converter = builder.build(); converter = builder.build();

View File

@ -20,6 +20,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -50,7 +52,7 @@ public class TestFSYarnSiteConverter {
yarnConfig.setInt( yarnConfig.setInt(
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, 666); FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, 666);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
assertTrue("Cont. scheduling", yarnConvertedConfig.getBoolean( assertTrue("Cont. scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false)); CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false));
@ -65,7 +67,7 @@ public class TestFSYarnSiteConverter {
yarnConfig.setInt("yarn.resource-types.memory-mb.increment-allocation", 11); yarnConfig.setInt("yarn.resource-types.memory-mb.increment-allocation", 11);
yarnConfig.setInt("yarn.resource-types.vcores.increment-allocation", 5); yarnConfig.setInt("yarn.resource-types.vcores.increment-allocation", 5);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
assertEquals("Memory alloc increment", 11, assertEquals("Memory alloc increment", 11,
yarnConvertedConfig.getInt("yarn.scheduler.minimum-allocation-mb", yarnConvertedConfig.getInt("yarn.scheduler.minimum-allocation-mb",
@ -83,7 +85,7 @@ public class TestFSYarnSiteConverter {
FairSchedulerConfiguration.WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS, FairSchedulerConfiguration.WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
321); 321);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
assertTrue("Preemption enabled", assertTrue("Preemption enabled",
yarnConvertedConfig.getBoolean( yarnConvertedConfig.getBoolean(
@ -103,7 +105,7 @@ public class TestFSYarnSiteConverter {
public void testSiteAssignMultipleConversion() { public void testSiteAssignMultipleConversion() {
yarnConfig.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true); yarnConfig.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
assertTrue("Assign multiple", assertTrue("Assign multiple",
yarnConvertedConfig.getBoolean( yarnConvertedConfig.getBoolean(
@ -115,7 +117,7 @@ public class TestFSYarnSiteConverter {
public void testSiteMaxAssignConversion() { public void testSiteMaxAssignConversion() {
yarnConfig.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 111); yarnConfig.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 111);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
assertEquals("Max assign", 111, assertEquals("Max assign", 111,
yarnConvertedConfig.getInt( yarnConvertedConfig.getInt(
@ -129,7 +131,7 @@ public class TestFSYarnSiteConverter {
yarnConfig.set(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, yarnConfig.set(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK,
"321.321"); "321.321");
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig); converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
assertEquals("Locality threshold node", "123.123", assertEquals("Locality threshold node", "123.123",
yarnConvertedConfig.get( yarnConvertedConfig.get(
@ -138,4 +140,23 @@ public class TestFSYarnSiteConverter {
yarnConvertedConfig.get( yarnConvertedConfig.get(
CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY)); CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY));
} }
@Test
public void testSiteDrfEnabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true);
assertEquals("Resource calculator type", DominantResourceCalculator.class,
yarnConvertedConfig.getClass(
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, null));
}
@Test
public void testSiteDrfDisabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
assertEquals("Resource calculator type", DefaultResourceCalculator.class,
yarnConvertedConfig.getClass(
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
CapacitySchedulerConfiguration.DEFAULT_RESOURCE_CALCULATOR_CLASS));
}
} }

View File

@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<allocations>
<queue name="root">
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>alice,bob,joe,john hadoop_users</aclSubmitApps>
<aclAdministerApps>alice,bob,joe,john hadoop_users</aclAdministerApps>
<queue name="default">
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
</queue>
<queue name="users" type="parent">
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<queue name="john">
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>john </aclSubmitApps>
<aclAdministerApps>john </aclAdministerApps>
<maxContainerAllocation>vcores=2,memory-mb=8192</maxContainerAllocation>
</queue>
<queue name="joe">
<maxResources>memory-mb=50.0%, vcores=50.0%</maxResources>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>joe </aclSubmitApps>
<aclAdministerApps>joe </aclAdministerApps>
</queue>
</queue>
<queue name="admins" type="parent">
<maxChildResources>memory-mb=8192, vcores=1</maxChildResources>
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<maxContainerAllocation>vcores=3,memory-mb=4096</maxContainerAllocation>
<queue name="alice">
<maxResources>memory-mb=16384, vcores=4</maxResources>
<maxRunningApps>2</maxRunningApps>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>alice </aclSubmitApps>
<aclAdministerApps>alice </aclAdministerApps>
<maxAMShare>0.15</maxAMShare>
<reservation>memory-mb=16384, vcores=4</reservation>
</queue>
<queue name="bob">
<maxResources>memory-mb=8192, vcores=2</maxResources>
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>bob </aclSubmitApps>
<aclAdministerApps>bob </aclAdministerApps>
<maxAMShare>-1.0</maxAMShare>
</queue>
</queue>
</queue>
<user name="alice">
<maxRunningApps>30</maxRunningApps>
</user>
<userMaxAppsDefault>10</userMaxAppsDefault>
<defaultFairSharePreemptionTimeout>23</defaultFairSharePreemptionTimeout>
<defaultMinSharePreemptionTimeout>24</defaultMinSharePreemptionTimeout>
<defaultFairSharePreemptionThreshold>0.12</defaultFairSharePreemptionThreshold>
<queueMaxAppsDefault>15</queueMaxAppsDefault>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queueMaxAMShareDefault>0.16</queueMaxAMShareDefault>
<queuePlacementPolicy>
<rule name="nestedUserQueue" create="false">
<rule name="default" create="false" queue="admins.devs"/>
</rule>
<rule name="specified" create="true"/>
<rule name="nestedUserQueue" create="true">
<rule name="default" create="false" queue="users"/>
</rule>
<rule name="default"/>
</queuePlacementPolicy>
</allocations>

View File

@ -31,7 +31,7 @@
<schedulingPolicy>drf</schedulingPolicy> <schedulingPolicy>drf</schedulingPolicy>
<queue name="john"> <queue name="john">
<weight>1.0</weight> <weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy> <schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>john </aclSubmitApps> <aclSubmitApps>john </aclSubmitApps>
<aclAdministerApps>john </aclAdministerApps> <aclAdministerApps>john </aclAdministerApps>
<maxContainerAllocation>vcores=2,memory-mb=8192</maxContainerAllocation> <maxContainerAllocation>vcores=2,memory-mb=8192</maxContainerAllocation>
@ -40,7 +40,7 @@
<maxResources>memory-mb=50.0%, vcores=50.0%</maxResources> <maxResources>memory-mb=50.0%, vcores=50.0%</maxResources>
<weight>3.0</weight> <weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom> <allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy> <schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>joe </aclSubmitApps> <aclSubmitApps>joe </aclSubmitApps>
<aclAdministerApps>joe </aclAdministerApps> <aclAdministerApps>joe </aclAdministerApps>
</queue> </queue>
@ -55,7 +55,7 @@
<maxRunningApps>2</maxRunningApps> <maxRunningApps>2</maxRunningApps>
<weight>3.0</weight> <weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom> <allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy> <schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>alice </aclSubmitApps> <aclSubmitApps>alice </aclSubmitApps>
<aclAdministerApps>alice </aclAdministerApps> <aclAdministerApps>alice </aclAdministerApps>
<maxAMShare>0.15</maxAMShare> <maxAMShare>0.15</maxAMShare>
@ -63,7 +63,7 @@
<queue name="bob"> <queue name="bob">
<maxResources>memory-mb=8192, vcores=2</maxResources> <maxResources>memory-mb=8192, vcores=2</maxResources>
<weight>1.0</weight> <weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy> <schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>bob </aclSubmitApps> <aclSubmitApps>bob </aclSubmitApps>
<aclAdministerApps>bob </aclAdministerApps> <aclAdministerApps>bob </aclAdministerApps>
<maxAMShare>-1.0</maxAMShare> <maxAMShare>-1.0</maxAMShare>