YARN-10067. Add dry-run feature to FS-CS converter tool. Contributed by Peter Bacsko
This commit is contained in:
parent
cebce0a348
commit
24e6a9e43a
@ -0,0 +1,80 @@
|
|||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
|
||||||
|
public class ConversionOptions {
|
||||||
|
private DryRunResultHolder dryRunResultHolder;
|
||||||
|
private boolean dryRun;
|
||||||
|
|
||||||
|
public ConversionOptions(DryRunResultHolder dryRunResultHolder,
|
||||||
|
boolean dryRun) {
|
||||||
|
this.dryRunResultHolder = dryRunResultHolder;
|
||||||
|
this.dryRun = dryRun;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setDryRun(boolean dryRun) {
|
||||||
|
this.dryRun = dryRun;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handleWarning(String msg, Logger log) {
|
||||||
|
if (dryRun) {
|
||||||
|
dryRunResultHolder.addDryRunWarning(msg);
|
||||||
|
} else {
|
||||||
|
log.warn(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handleError(String msg) {
|
||||||
|
if (dryRun) {
|
||||||
|
dryRunResultHolder.addDryRunError(msg);
|
||||||
|
} else {
|
||||||
|
throw new UnsupportedPropertyException(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handleConversionError(String msg) {
|
||||||
|
if (dryRun) {
|
||||||
|
dryRunResultHolder.addDryRunError(msg);
|
||||||
|
} else {
|
||||||
|
throw new ConversionException(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handlePreconditionError(String msg) {
|
||||||
|
if (dryRun) {
|
||||||
|
dryRunResultHolder.addDryRunError(msg);
|
||||||
|
} else {
|
||||||
|
throw new PreconditionException(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handleParsingFinished() {
|
||||||
|
if (dryRun) {
|
||||||
|
dryRunResultHolder.printDryRunResults();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void handleGenericException(Exception e, String msg) {
|
||||||
|
if (dryRun) {
|
||||||
|
dryRunResultHolder.addDryRunError(msg);
|
||||||
|
} else {
|
||||||
|
FSConfigToCSConfigArgumentHandler.logAndStdErr(e, msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,80 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
|
||||||
|
public class DryRunResultHolder {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(DryRunResultHolder.class);
|
||||||
|
|
||||||
|
private Set<String> warnings;
|
||||||
|
private Set<String> errors;
|
||||||
|
|
||||||
|
public DryRunResultHolder() {
|
||||||
|
this.warnings = new HashSet<>();
|
||||||
|
this.errors = new HashSet<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addDryRunWarning(String message) {
|
||||||
|
warnings.add(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addDryRunError(String message) {
|
||||||
|
errors.add(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> getWarnings() {
|
||||||
|
return ImmutableSet.copyOf(warnings);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Set<String> getErrors() {
|
||||||
|
return ImmutableSet.copyOf(errors);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void printDryRunResults() {
|
||||||
|
LOG.info("");
|
||||||
|
LOG.info("Results of dry run:");
|
||||||
|
LOG.info("");
|
||||||
|
|
||||||
|
int noOfErrors = errors.size();
|
||||||
|
int noOfWarnings = warnings.size();
|
||||||
|
|
||||||
|
LOG.info("Number of errors: {}", noOfErrors);
|
||||||
|
LOG.info("Number of warnings: {}", noOfWarnings);
|
||||||
|
|
||||||
|
if (noOfErrors > 0) {
|
||||||
|
LOG.info("");
|
||||||
|
LOG.info("List of errors:");
|
||||||
|
errors.forEach(s -> LOG.info(s));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (noOfWarnings > 0) {
|
||||||
|
LOG.info("");
|
||||||
|
LOG.info("List of warnings:");
|
||||||
|
warnings.forEach(s -> LOG.info(s));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -16,6 +16,9 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.GnuParser;
|
import org.apache.commons.cli.GnuParser;
|
||||||
import org.apache.commons.cli.HelpFormatter;
|
import org.apache.commons.cli.HelpFormatter;
|
||||||
@ -25,7 +28,7 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Parses arguments passed to the FS->CS converter.
|
* Parses arguments passed to the FS->CS converter.
|
||||||
@ -35,11 +38,22 @@
|
|||||||
public class FSConfigToCSConfigArgumentHandler {
|
public class FSConfigToCSConfigArgumentHandler {
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(FSConfigToCSConfigArgumentHandler.class);
|
LoggerFactory.getLogger(FSConfigToCSConfigArgumentHandler.class);
|
||||||
private final FSConfigToCSConfigConverter converter;
|
|
||||||
|
|
||||||
public FSConfigToCSConfigArgumentHandler(FSConfigToCSConfigConverter
|
private FSConfigToCSConfigRuleHandler ruleHandler;
|
||||||
converter) {
|
private FSConfigToCSConfigConverterParams converterParams;
|
||||||
this.converter = converter;
|
private ConversionOptions conversionOptions;
|
||||||
|
|
||||||
|
private Supplier<FSConfigToCSConfigConverter>
|
||||||
|
converterFunc = this::getConverter;
|
||||||
|
|
||||||
|
public FSConfigToCSConfigArgumentHandler() {
|
||||||
|
this.conversionOptions = new ConversionOptions(new DryRunResultHolder(),
|
||||||
|
false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
FSConfigToCSConfigArgumentHandler(ConversionOptions conversionOptions) {
|
||||||
|
this.conversionOptions = conversionOptions;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -69,6 +83,8 @@ public enum CliOption {
|
|||||||
" capacity-scheduler.xml files." +
|
" capacity-scheduler.xml files." +
|
||||||
"Must have write permission for user who is running this script.",
|
"Must have write permission for user who is running this script.",
|
||||||
true),
|
true),
|
||||||
|
DRY_RUN("dry run", "d", "dry-run", "Performs a dry-run of the conversion." +
|
||||||
|
"Outputs whether the conversion is possible or not.", false),
|
||||||
HELP("help", "h", "help", "Displays the list of options", false);
|
HELP("help", "h", "help", "Displays the list of options", false);
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
@ -94,6 +110,7 @@ public Option createCommonsCliOption() {
|
|||||||
|
|
||||||
int parseAndConvert(String[] args) throws Exception {
|
int parseAndConvert(String[] args) throws Exception {
|
||||||
Options opts = createOptions();
|
Options opts = createOptions();
|
||||||
|
int retVal = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (args.length == 0) {
|
if (args.length == 0) {
|
||||||
@ -109,36 +126,41 @@ int parseAndConvert(String[] args) throws Exception {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
checkOptionPresent(cliParser, CliOption.YARN_SITE);
|
FSConfigToCSConfigConverter converter =
|
||||||
checkOutputDefined(cliParser);
|
prepareAndGetConverter(cliParser);
|
||||||
|
|
||||||
FSConfigToCSConfigConverterParams params = validateInputFiles(cliParser);
|
converter.convert(converterParams);
|
||||||
converter.convert(params);
|
|
||||||
} catch (ParseException e) {
|
} catch (ParseException e) {
|
||||||
String msg = "Options parsing failed: " + e.getMessage();
|
String msg = "Options parsing failed: " + e.getMessage();
|
||||||
logAndStdErr(e, msg);
|
logAndStdErr(e, msg);
|
||||||
printHelp(opts);
|
printHelp(opts);
|
||||||
return -1;
|
retVal = -1;
|
||||||
} catch (PreconditionException e) {
|
} catch (PreconditionException e) {
|
||||||
String msg = "Cannot start FS config conversion due to the following"
|
String msg = "Cannot start FS config conversion due to the following"
|
||||||
+ " precondition error: " + e.getMessage();
|
+ " precondition error: " + e.getMessage();
|
||||||
logAndStdErr(e, msg);
|
handleException(e, msg);
|
||||||
return -1;
|
retVal = -1;
|
||||||
} catch (UnsupportedPropertyException e) {
|
} catch (UnsupportedPropertyException e) {
|
||||||
String msg = "Unsupported property/setting encountered during FS config "
|
String msg = "Unsupported property/setting encountered during FS config "
|
||||||
+ "conversion: " + e.getMessage();
|
+ "conversion: " + e.getMessage();
|
||||||
logAndStdErr(e, msg);
|
handleException(e, msg);
|
||||||
return -1;
|
retVal = -1;
|
||||||
} catch (ConversionException | IllegalArgumentException e) {
|
} catch (ConversionException | IllegalArgumentException e) {
|
||||||
String msg = "Fatal error during FS config conversion: " + e.getMessage();
|
String msg = "Fatal error during FS config conversion: " + e.getMessage();
|
||||||
logAndStdErr(e, msg);
|
handleException(e, msg);
|
||||||
return -1;
|
retVal = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
conversionOptions.handleParsingFinished();
|
||||||
|
|
||||||
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void logAndStdErr(Exception e, String msg) {
|
private void handleException(Exception e, String msg) {
|
||||||
|
conversionOptions.handleGenericException(e, msg);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void logAndStdErr(Exception e, String msg) {
|
||||||
LOG.debug("Stack trace", e);
|
LOG.debug("Stack trace", e);
|
||||||
LOG.error(msg);
|
LOG.error(msg);
|
||||||
System.err.println(msg);
|
System.err.println(msg);
|
||||||
@ -154,6 +176,20 @@ private Options createOptions() {
|
|||||||
return opts;
|
return opts;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private FSConfigToCSConfigConverter prepareAndGetConverter(
|
||||||
|
CommandLine cliParser) {
|
||||||
|
conversionOptions.setDryRun(
|
||||||
|
cliParser.hasOption(CliOption.DRY_RUN.shortSwitch));
|
||||||
|
|
||||||
|
checkOptionPresent(cliParser, CliOption.YARN_SITE);
|
||||||
|
checkOutputDefined(cliParser);
|
||||||
|
|
||||||
|
converterParams = validateInputFiles(cliParser);
|
||||||
|
ruleHandler = new FSConfigToCSConfigRuleHandler(conversionOptions);
|
||||||
|
|
||||||
|
return converterFunc.get();
|
||||||
|
}
|
||||||
|
|
||||||
private FSConfigToCSConfigConverterParams validateInputFiles(
|
private FSConfigToCSConfigConverterParams validateInputFiles(
|
||||||
CommandLine cliParser) {
|
CommandLine cliParser) {
|
||||||
String yarnSiteXmlFile =
|
String yarnSiteXmlFile =
|
||||||
@ -239,4 +275,14 @@ private static void checkFileInternal(CliOption cliOption, String filePath,
|
|||||||
"(As value of parameter %s)", filePath, cliOption.name));
|
"(As value of parameter %s)", filePath, cliOption.name));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private FSConfigToCSConfigConverter getConverter() {
|
||||||
|
return new FSConfigToCSConfigConverter(ruleHandler, conversionOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setConverterSupplier(Supplier<FSConfigToCSConfigConverter>
|
||||||
|
supplier) {
|
||||||
|
this.converterFunc = supplier;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -65,7 +65,6 @@ public class FSConfigToCSConfigConverter {
|
|||||||
"WARNING: This feature is experimental and not intended " +
|
"WARNING: This feature is experimental and not intended " +
|
||||||
"for production use!";
|
"for production use!";
|
||||||
|
|
||||||
|
|
||||||
private Resource clusterResource;
|
private Resource clusterResource;
|
||||||
private boolean preemptionEnabled = false;
|
private boolean preemptionEnabled = false;
|
||||||
private int queueMaxAppsDefault;
|
private int queueMaxAppsDefault;
|
||||||
@ -73,6 +72,7 @@ public class FSConfigToCSConfigConverter {
|
|||||||
private boolean autoCreateChildQueues = false;
|
private boolean autoCreateChildQueues = false;
|
||||||
private boolean sizeBasedWeight = false;
|
private boolean sizeBasedWeight = false;
|
||||||
private boolean userAsDefaultQueue = false;
|
private boolean userAsDefaultQueue = false;
|
||||||
|
private ConversionOptions conversionOptions;
|
||||||
|
|
||||||
private Configuration yarnSiteConfig;
|
private Configuration yarnSiteConfig;
|
||||||
private Configuration capacitySchedulerConfig;
|
private Configuration capacitySchedulerConfig;
|
||||||
@ -83,8 +83,9 @@ public class FSConfigToCSConfigConverter {
|
|||||||
private boolean consoleMode = false;
|
private boolean consoleMode = false;
|
||||||
|
|
||||||
public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
|
public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
|
||||||
ruleHandler) {
|
ruleHandler, ConversionOptions conversionOptions) {
|
||||||
this.ruleHandler = ruleHandler;
|
this.ruleHandler = ruleHandler;
|
||||||
|
this.conversionOptions = conversionOptions;
|
||||||
this.yarnSiteOutputStream = System.out;
|
this.yarnSiteOutputStream = System.out;
|
||||||
this.capacitySchedulerOutputStream = System.out;
|
this.capacitySchedulerOutputStream = System.out;
|
||||||
}
|
}
|
||||||
@ -257,14 +258,19 @@ private void convertCapacitySchedulerXml(FairScheduler fs) {
|
|||||||
FSParentQueue rootQueue = fs.getQueueManager().getRootQueue();
|
FSParentQueue rootQueue = fs.getQueueManager().getRootQueue();
|
||||||
emitDefaultMaxApplications();
|
emitDefaultMaxApplications();
|
||||||
emitDefaultMaxAMShare();
|
emitDefaultMaxAMShare();
|
||||||
FSQueueConverter queueConverter = new FSQueueConverter(ruleHandler,
|
|
||||||
capacitySchedulerConfig,
|
FSQueueConverter queueConverter = FSQueueConverterBuilder.create()
|
||||||
preemptionEnabled,
|
.withRuleHandler(ruleHandler)
|
||||||
sizeBasedWeight,
|
.withCapacitySchedulerConfig(capacitySchedulerConfig)
|
||||||
autoCreateChildQueues,
|
.withPreemptionEnabled(preemptionEnabled)
|
||||||
clusterResource,
|
.withSizeBasedWeight(sizeBasedWeight)
|
||||||
queueMaxAMShareDefault,
|
.withAutoCreateChildQueues(autoCreateChildQueues)
|
||||||
queueMaxAppsDefault);
|
.withClusterResource(clusterResource)
|
||||||
|
.withQueueMaxAMShareDefault(queueMaxAMShareDefault)
|
||||||
|
.withQueueMaxAppsDefault(queueMaxAppsDefault)
|
||||||
|
.withConversionOptions(conversionOptions)
|
||||||
|
.build();
|
||||||
|
|
||||||
queueConverter.convertQueueHierarchy(rootQueue);
|
queueConverter.convertQueueHierarchy(rootQueue);
|
||||||
emitACLs(fs);
|
emitACLs(fs);
|
||||||
|
|
||||||
|
@ -34,12 +34,8 @@ public class FSConfigToCSConfigConverterMain {
|
|||||||
|
|
||||||
public static void main(String[] args) {
|
public static void main(String[] args) {
|
||||||
try {
|
try {
|
||||||
FSConfigToCSConfigRuleHandler ruleHandler =
|
|
||||||
new FSConfigToCSConfigRuleHandler();
|
|
||||||
FSConfigToCSConfigConverter converter =
|
|
||||||
new FSConfigToCSConfigConverter(ruleHandler);
|
|
||||||
FSConfigToCSConfigArgumentHandler fsConfigConversionArgumentHandler =
|
FSConfigToCSConfigArgumentHandler fsConfigConversionArgumentHandler =
|
||||||
new FSConfigToCSConfigArgumentHandler(converter);
|
new FSConfigToCSConfigArgumentHandler();
|
||||||
int exitCode =
|
int exitCode =
|
||||||
fsConfigConversionArgumentHandler.parseAndConvert(args);
|
fsConfigConversionArgumentHandler.parseAndConvert(args);
|
||||||
if (exitCode != 0) {
|
if (exitCode != 0) {
|
||||||
|
@ -44,6 +44,7 @@ public class FSConfigToCSConfigRuleHandler {
|
|||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(FSConfigToCSConfigRuleHandler.class);
|
LoggerFactory.getLogger(FSConfigToCSConfigRuleHandler.class);
|
||||||
|
|
||||||
|
private ConversionOptions conversionOptions;
|
||||||
|
|
||||||
public static final String MAX_CHILD_QUEUE_LIMIT =
|
public static final String MAX_CHILD_QUEUE_LIMIT =
|
||||||
"maxChildQueue.limit";
|
"maxChildQueue.limit";
|
||||||
@ -94,15 +95,18 @@ void loadRulesFromFile(String ruleFile) throws IOException {
|
|||||||
initPropertyActions();
|
initPropertyActions();
|
||||||
}
|
}
|
||||||
|
|
||||||
public FSConfigToCSConfigRuleHandler() {
|
public FSConfigToCSConfigRuleHandler(ConversionOptions conversionOptions) {
|
||||||
properties = new Properties();
|
this.properties = new Properties();
|
||||||
actions = new HashMap<>();
|
this.actions = new HashMap<>();
|
||||||
|
this.conversionOptions = conversionOptions;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
FSConfigToCSConfigRuleHandler(Properties props) {
|
FSConfigToCSConfigRuleHandler(Properties props,
|
||||||
properties = props;
|
ConversionOptions conversionOptions) {
|
||||||
actions = new HashMap<>();
|
this.properties = props;
|
||||||
|
this.actions = new HashMap<>();
|
||||||
|
this.conversionOptions = conversionOptions;
|
||||||
initPropertyActions();
|
initPropertyActions();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,14 +193,13 @@ private void handle(String actionName, String fsSetting, String message) {
|
|||||||
} else {
|
} else {
|
||||||
exceptionMessage = format("Setting %s is not supported", fsSetting);
|
exceptionMessage = format("Setting %s is not supported", fsSetting);
|
||||||
}
|
}
|
||||||
throw new UnsupportedPropertyException(exceptionMessage);
|
conversionOptions.handleError(exceptionMessage);
|
||||||
|
break;
|
||||||
case WARNING:
|
case WARNING:
|
||||||
if (message != null) {
|
String loggedMsg = (message != null) ? message :
|
||||||
LOG.warn(message);
|
format("Setting %s is not supported, ignoring conversion",
|
||||||
} else {
|
fsSetting);
|
||||||
LOG.warn("Setting {} is not supported, ignoring conversion",
|
conversionOptions.handleWarning(loggedMsg, LOG);
|
||||||
fsSetting);
|
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
|
@ -59,27 +59,21 @@ public class FSQueueConverter {
|
|||||||
private boolean fifoOrFairSharePolicyUsed;
|
private boolean fifoOrFairSharePolicyUsed;
|
||||||
private boolean drfPolicyUsedOnQueueLevel;
|
private boolean drfPolicyUsedOnQueueLevel;
|
||||||
|
|
||||||
@SuppressWarnings("checkstyle:parameternumber")
|
private ConversionOptions conversionOptions;
|
||||||
public FSQueueConverter(FSConfigToCSConfigRuleHandler ruleHandler,
|
|
||||||
Configuration capacitySchedulerConfig,
|
public FSQueueConverter(FSQueueConverterBuilder builder) {
|
||||||
boolean preemptionEnabled,
|
|
||||||
boolean sizeBasedWeight,
|
|
||||||
boolean autoCreateChildQueues,
|
|
||||||
Resource clusterResource,
|
|
||||||
float queueMaxAMShareDefault,
|
|
||||||
int queueMaxAppsDefault) {
|
|
||||||
this.leafQueueNames = new HashSet<>();
|
this.leafQueueNames = new HashSet<>();
|
||||||
this.ruleHandler = ruleHandler;
|
this.ruleHandler = builder.ruleHandler;
|
||||||
this.capacitySchedulerConfig = capacitySchedulerConfig;
|
this.capacitySchedulerConfig = builder.capacitySchedulerConfig;
|
||||||
this.preemptionEnabled = preemptionEnabled;
|
this.preemptionEnabled = builder.preemptionEnabled;
|
||||||
this.sizeBasedWeight = sizeBasedWeight;
|
this.sizeBasedWeight = builder.sizeBasedWeight;
|
||||||
this.clusterResource = clusterResource;
|
this.clusterResource = builder.clusterResource;
|
||||||
this.queueMaxAMShareDefault = queueMaxAMShareDefault;
|
this.queueMaxAMShareDefault = builder.queueMaxAMShareDefault;
|
||||||
this.autoCreateChildQueues = autoCreateChildQueues;
|
this.autoCreateChildQueues = builder.autoCreateChildQueues;
|
||||||
this.queueMaxAppsDefault = queueMaxAppsDefault;
|
this.queueMaxAppsDefault = builder.queueMaxAppsDefault;
|
||||||
|
this.conversionOptions = builder.conversionOptions;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("checkstyle:linelength")
|
|
||||||
public void convertQueueHierarchy(FSQueue queue) {
|
public void convertQueueHierarchy(FSQueue queue) {
|
||||||
List<FSQueue> children = queue.getChildQueues();
|
List<FSQueue> children = queue.getChildQueues();
|
||||||
final String queueName = queue.getName();
|
final String queueName = queue.getName();
|
||||||
@ -87,9 +81,9 @@ public void convertQueueHierarchy(FSQueue queue) {
|
|||||||
if (queue instanceof FSLeafQueue) {
|
if (queue instanceof FSLeafQueue) {
|
||||||
String shortName = getQueueShortName(queueName);
|
String shortName = getQueueShortName(queueName);
|
||||||
if (!leafQueueNames.add(shortName)) {
|
if (!leafQueueNames.add(shortName)) {
|
||||||
throw new ConversionException(
|
String msg = String.format("Leaf queues must be unique, "
|
||||||
"Leaf queues must be unique, "
|
+ "%s is defined at least twice", shortName);
|
||||||
+ shortName + " is defined at least twice");
|
conversionOptions.handleConversionError(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -99,7 +93,6 @@ public void convertQueueHierarchy(FSQueue queue) {
|
|||||||
emitMaxAllocations(queueName, queue);
|
emitMaxAllocations(queueName, queue);
|
||||||
emitPreemptionDisabled(queueName, queue);
|
emitPreemptionDisabled(queueName, queue);
|
||||||
|
|
||||||
// TODO: COULD BE incorrect! Needs further clarifications
|
|
||||||
emitChildCapacity(queue);
|
emitChildCapacity(queue);
|
||||||
emitMaximumCapacity(queueName, queue);
|
emitMaximumCapacity(queueName, queue);
|
||||||
emitAutoCreateChildQueue(queueName);
|
emitAutoCreateChildQueue(queueName);
|
||||||
@ -191,10 +184,13 @@ private void emitMaximumCapacity(String queueName, FSQueue queue) {
|
|||||||
if (maxResource == null) {
|
if (maxResource == null) {
|
||||||
if (rawMaxShare.getPercentages() != null) {
|
if (rawMaxShare.getPercentages() != null) {
|
||||||
if (clusterResource == null) {
|
if (clusterResource == null) {
|
||||||
throw new ConversionException(
|
String message = String.format(
|
||||||
String.format("<maxResources> defined in percentages for" +
|
"<maxResources> defined in percentages for" +
|
||||||
" queue %s, but cluster resource parameter is not" +
|
" queue %s, but cluster resource parameter is not" +
|
||||||
" defined via CLI!", queueName));
|
" defined via CLI!", queueName);
|
||||||
|
|
||||||
|
conversionOptions.handleConversionError(message);
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
ruleHandler.handleMaxCapacityPercentage(queueName);
|
ruleHandler.handleMaxCapacityPercentage(queueName);
|
||||||
@ -209,8 +205,8 @@ private void emitMaximumCapacity(String queueName, FSQueue queue) {
|
|||||||
clusterResource.getVirtualCores());
|
clusterResource.getVirtualCores());
|
||||||
defined = true;
|
defined = true;
|
||||||
} else {
|
} else {
|
||||||
throw new PreconditionException(
|
conversionOptions.handlePreconditionError(
|
||||||
"Illegal ConfigurableResource = " + rawMaxShare);
|
"Illegal ConfigurableResource object = " + rawMaxShare);
|
||||||
}
|
}
|
||||||
} else if (isNotUnboundedResource(maxResource)) {
|
} else if (isNotUnboundedResource(maxResource)) {
|
||||||
memSize = maxResource.getMemorySize();
|
memSize = maxResource.getMemorySize();
|
||||||
@ -326,8 +322,9 @@ private void emitOrderingPolicy(String queueName, FSQueue queue) {
|
|||||||
drfPolicyUsedOnQueueLevel = true;
|
drfPolicyUsedOnQueueLevel = true;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
throw new ConversionException("Unexpected ordering policy " +
|
String msg = String.format("Unexpected ordering policy " +
|
||||||
"on queue " + queueName + ": " + policy);
|
"on queue %s: %s", queue, policy);
|
||||||
|
conversionOptions.handleConversionError(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -489,5 +486,4 @@ public String toString() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
@ -0,0 +1,100 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
|
||||||
|
@SuppressWarnings({"checkstyle:visibilitymodifier", "checkstyle:hiddenfield"})
|
||||||
|
public final class FSQueueConverterBuilder {
|
||||||
|
FSConfigToCSConfigRuleHandler ruleHandler;
|
||||||
|
Configuration capacitySchedulerConfig;
|
||||||
|
boolean preemptionEnabled;
|
||||||
|
boolean sizeBasedWeight;
|
||||||
|
boolean autoCreateChildQueues;
|
||||||
|
Resource clusterResource;
|
||||||
|
float queueMaxAMShareDefault;
|
||||||
|
int queueMaxAppsDefault;
|
||||||
|
ConversionOptions conversionOptions;
|
||||||
|
|
||||||
|
private FSQueueConverterBuilder() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static FSQueueConverterBuilder create() {
|
||||||
|
return new FSQueueConverterBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSQueueConverterBuilder withRuleHandler(
|
||||||
|
FSConfigToCSConfigRuleHandler ruleHandler) {
|
||||||
|
this.ruleHandler = ruleHandler;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSQueueConverterBuilder withCapacitySchedulerConfig(
|
||||||
|
Configuration config) {
|
||||||
|
this.capacitySchedulerConfig = config;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSQueueConverterBuilder withPreemptionEnabled(
|
||||||
|
boolean preemptionEnabled) {
|
||||||
|
this.preemptionEnabled = preemptionEnabled;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSQueueConverterBuilder withSizeBasedWeight(
|
||||||
|
boolean sizeBasedWeight) {
|
||||||
|
this.sizeBasedWeight = sizeBasedWeight;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSQueueConverterBuilder withAutoCreateChildQueues(
|
||||||
|
boolean autoCreateChildQueues) {
|
||||||
|
this.autoCreateChildQueues = autoCreateChildQueues;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSQueueConverterBuilder withClusterResource(
|
||||||
|
Resource resource) {
|
||||||
|
this.clusterResource = resource;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSQueueConverterBuilder withQueueMaxAMShareDefault(
|
||||||
|
float queueMaxAMShareDefault) {
|
||||||
|
this.queueMaxAMShareDefault = queueMaxAMShareDefault;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSQueueConverterBuilder withQueueMaxAppsDefault(
|
||||||
|
int queueMaxAppsDefault) {
|
||||||
|
this.queueMaxAppsDefault = queueMaxAppsDefault;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSQueueConverterBuilder withConversionOptions(
|
||||||
|
ConversionOptions conversionOptions) {
|
||||||
|
this.conversionOptions = conversionOptions;
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
public FSQueueConverter build() {
|
||||||
|
return new FSQueueConverter(this);
|
||||||
|
}
|
||||||
|
}
|
@ -16,7 +16,14 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
@ -31,13 +38,7 @@
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import com.google.common.collect.Lists;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.junit.Assert.assertFalse;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unit tests for FSConfigToCSConfigArgumentHandler.
|
* Unit tests for FSConfigToCSConfigArgumentHandler.
|
||||||
@ -53,12 +54,18 @@ public class TestFSConfigToCSConfigArgumentHandler {
|
|||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private FSConfigToCSConfigConverter mockConverter;
|
private FSConfigToCSConfigConverter mockConverter;
|
||||||
|
|
||||||
|
private DryRunResultHolder dryRunResultHolder;
|
||||||
|
private ConversionOptions conversionOptions;
|
||||||
|
|
||||||
private FSConfigConverterTestCommons fsTestCommons;
|
private FSConfigConverterTestCommons fsTestCommons;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws IOException {
|
public void setUp() throws IOException {
|
||||||
fsTestCommons = new FSConfigConverterTestCommons();
|
fsTestCommons = new FSConfigConverterTestCommons();
|
||||||
fsTestCommons.setUp();
|
fsTestCommons.setUp();
|
||||||
|
dryRunResultHolder = new DryRunResultHolder();
|
||||||
|
conversionOptions = new ConversionOptions(dryRunResultHolder, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
@ -80,7 +87,15 @@ private void setupFSConfigConversionFiles(boolean defineAllocationFile)
|
|||||||
|
|
||||||
|
|
||||||
private FSConfigToCSConfigArgumentHandler createArgumentHandler() {
|
private FSConfigToCSConfigArgumentHandler createArgumentHandler() {
|
||||||
return new FSConfigToCSConfigArgumentHandler(mockConverter);
|
FSConfigToCSConfigArgumentHandler argumentHandler =
|
||||||
|
new FSConfigToCSConfigArgumentHandler();
|
||||||
|
argumentHandler.setConverterSupplier(this::getMockConverter);
|
||||||
|
|
||||||
|
return argumentHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
private FSConfigToCSConfigConverter getMockConverter() {
|
||||||
|
return mockConverter;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String[] getDefaultArgumentsAsArray() {
|
private static String[] getDefaultArgumentsAsArray() {
|
||||||
@ -252,7 +267,7 @@ public void testConvertFSConfigurationDefaults() throws Exception {
|
|||||||
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
|
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
|
||||||
|
|
||||||
FSConfigToCSConfigArgumentHandler argumentHandler =
|
FSConfigToCSConfigArgumentHandler argumentHandler =
|
||||||
new FSConfigToCSConfigArgumentHandler(mockConverter);
|
createArgumentHandler();
|
||||||
|
|
||||||
String[] args = getArgumentsAsArrayWithDefaults("-f",
|
String[] args = getArgumentsAsArrayWithDefaults("-f",
|
||||||
FSConfigConverterTestCommons.FS_ALLOC_FILE,
|
FSConfigConverterTestCommons.FS_ALLOC_FILE,
|
||||||
@ -284,7 +299,7 @@ public void testConvertFSConfigurationWithConsoleParam()
|
|||||||
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
|
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
|
||||||
|
|
||||||
FSConfigToCSConfigArgumentHandler argumentHandler =
|
FSConfigToCSConfigArgumentHandler argumentHandler =
|
||||||
new FSConfigToCSConfigArgumentHandler(mockConverter);
|
createArgumentHandler();
|
||||||
|
|
||||||
String[] args = getArgumentsAsArrayWithDefaults("-f",
|
String[] args = getArgumentsAsArrayWithDefaults("-f",
|
||||||
FSConfigConverterTestCommons.FS_ALLOC_FILE,
|
FSConfigConverterTestCommons.FS_ALLOC_FILE,
|
||||||
@ -316,7 +331,7 @@ public void testConvertFSConfigurationClusterResource()
|
|||||||
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
|
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
|
||||||
|
|
||||||
FSConfigToCSConfigArgumentHandler argumentHandler =
|
FSConfigToCSConfigArgumentHandler argumentHandler =
|
||||||
new FSConfigToCSConfigArgumentHandler(mockConverter);
|
createArgumentHandler();
|
||||||
|
|
||||||
String[] args = getArgumentsAsArrayWithDefaults("-f",
|
String[] args = getArgumentsAsArrayWithDefaults("-f",
|
||||||
FSConfigConverterTestCommons.FS_ALLOC_FILE,
|
FSConfigConverterTestCommons.FS_ALLOC_FILE,
|
||||||
@ -379,4 +394,54 @@ public void testConvertFSConfigurationErrorHandling2() throws Exception {
|
|||||||
assertTrue("Error content missing", fsTestCommons.getErrContent()
|
assertTrue("Error content missing", fsTestCommons.getErrContent()
|
||||||
.toString().contains("Fatal error during FS config conversion"));
|
.toString().contains("Fatal error during FS config conversion"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDryRunWhenPreconditionExceptionOccurs() throws Exception {
|
||||||
|
testDryRunWithException(new PreconditionException("test"),
|
||||||
|
"Cannot start FS config conversion");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDryRunWhenUnsupportedPropertyExceptionExceptionOccurs()
|
||||||
|
throws Exception {
|
||||||
|
testDryRunWithException(new UnsupportedPropertyException("test"),
|
||||||
|
"Unsupported property/setting encountered");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDryRunWhenConversionExceptionExceptionOccurs()
|
||||||
|
throws Exception {
|
||||||
|
testDryRunWithException(new ConversionException("test"),
|
||||||
|
"Fatal error during FS config conversion");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDryRunWhenIllegalArgumentExceptionExceptionOccurs()
|
||||||
|
throws Exception {
|
||||||
|
testDryRunWithException(new IllegalArgumentException("test"),
|
||||||
|
"Fatal error during FS config conversion");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void testDryRunWithException(Exception exception,
|
||||||
|
String expectedErrorMessage) throws Exception {
|
||||||
|
setupFSConfigConversionFiles(true);
|
||||||
|
|
||||||
|
String[] args = getArgumentsAsArrayWithDefaults("-f",
|
||||||
|
FSConfigConverterTestCommons.FS_ALLOC_FILE,
|
||||||
|
"-r", FSConfigConverterTestCommons.CONVERSION_RULES_FILE, "-p",
|
||||||
|
"-d");
|
||||||
|
FSConfigToCSConfigArgumentHandler argumentHandler =
|
||||||
|
new FSConfigToCSConfigArgumentHandler(conversionOptions);
|
||||||
|
argumentHandler.setConverterSupplier(this::getMockConverter);
|
||||||
|
|
||||||
|
Mockito.doThrow(exception).when(mockConverter)
|
||||||
|
.convert(ArgumentMatchers.any(FSConfigToCSConfigConverterParams.class));
|
||||||
|
|
||||||
|
int retVal = argumentHandler.parseAndConvert(args);
|
||||||
|
assertEquals("Return value", -1, retVal);
|
||||||
|
assertEquals("Number of errors", 1, dryRunResultHolder.getErrors().size());
|
||||||
|
String error = dryRunResultHolder.getErrors().iterator().next();
|
||||||
|
assertTrue("Unexpected error message",
|
||||||
|
error.contains(expectedErrorMessage));
|
||||||
|
}
|
||||||
}
|
}
|
@ -68,6 +68,9 @@ public class TestFSConfigToCSConfigConverter {
|
|||||||
@Mock
|
@Mock
|
||||||
private FSConfigToCSConfigRuleHandler ruleHandler;
|
private FSConfigToCSConfigRuleHandler ruleHandler;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private DryRunResultHolder dryRunResultHolder;
|
||||||
|
|
||||||
private FSConfigToCSConfigConverter converter;
|
private FSConfigToCSConfigConverter converter;
|
||||||
private Configuration config;
|
private Configuration config;
|
||||||
|
|
||||||
@ -93,6 +96,10 @@ private static String prepareFileName(String f) {
|
|||||||
new File("src/test/resources/conversion-rules.properties")
|
new File("src/test/resources/conversion-rules.properties")
|
||||||
.getAbsolutePath();
|
.getAbsolutePath();
|
||||||
|
|
||||||
|
private ConversionOptions createDefaultConversionOptions() {
|
||||||
|
return new ConversionOptions(new DryRunResultHolder(), false);
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
config = new Configuration(false);
|
config = new Configuration(false);
|
||||||
@ -109,7 +116,8 @@ public void tearDown() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void createConverter() {
|
private void createConverter() {
|
||||||
converter = new FSConfigToCSConfigConverter(ruleHandler);
|
converter = new FSConfigToCSConfigConverter(ruleHandler,
|
||||||
|
createDefaultConversionOptions());
|
||||||
converter.setClusterResource(CLUSTER_RESOURCE);
|
converter.setClusterResource(CLUSTER_RESOURCE);
|
||||||
ByteArrayOutputStream yarnSiteOut = new ByteArrayOutputStream();
|
ByteArrayOutputStream yarnSiteOut = new ByteArrayOutputStream();
|
||||||
csConfigOut = new ByteArrayOutputStream();
|
csConfigOut = new ByteArrayOutputStream();
|
||||||
@ -325,7 +333,8 @@ public void testConvertFSConfigurationClusterResourceInvalid2()
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConvertFSConfigurationRulesFile() throws Exception {
|
public void testConvertFSConfigurationRulesFile() throws Exception {
|
||||||
ruleHandler = new FSConfigToCSConfigRuleHandler();
|
ruleHandler = new FSConfigToCSConfigRuleHandler(
|
||||||
|
createDefaultConversionOptions());
|
||||||
createConverter();
|
createConverter();
|
||||||
|
|
||||||
FSConfigToCSConfigConverterParams params =
|
FSConfigToCSConfigConverterParams params =
|
||||||
|
@ -30,6 +30,8 @@
|
|||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -41,10 +43,26 @@ public class TestFSConfigToCSConfigRuleHandler {
|
|||||||
private static final String WARNING = "warning";
|
private static final String WARNING = "warning";
|
||||||
|
|
||||||
private FSConfigToCSConfigRuleHandler ruleHandler;
|
private FSConfigToCSConfigRuleHandler ruleHandler;
|
||||||
|
private DryRunResultHolder dryRunResultHolder;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
dryRunResultHolder = new DryRunResultHolder();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private ConversionOptions createDryRunConversionOptions() {
|
||||||
|
return new ConversionOptions(dryRunResultHolder, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ConversionOptions createDefaultConversionOptions() {
|
||||||
|
return new ConversionOptions(dryRunResultHolder, false);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testInitPropertyActionsToWarning() throws IOException {
|
public void testInitPropertyActionsToWarning() throws IOException {
|
||||||
ruleHandler = new FSConfigToCSConfigRuleHandler(new Properties());
|
ruleHandler = new FSConfigToCSConfigRuleHandler(new Properties(),
|
||||||
|
createDefaultConversionOptions());
|
||||||
|
|
||||||
ruleHandler.handleChildQueueCount("test", 1);
|
ruleHandler.handleChildQueueCount("test", 1);
|
||||||
ruleHandler.handleDynamicMaxAssign();
|
ruleHandler.handleDynamicMaxAssign();
|
||||||
@ -69,7 +87,8 @@ public void testAllRulesWarning() throws IOException {
|
|||||||
rules.put(USER_MAX_APPS_DEFAULT, WARNING);
|
rules.put(USER_MAX_APPS_DEFAULT, WARNING);
|
||||||
rules.put(USER_MAX_RUNNING_APPS, WARNING);
|
rules.put(USER_MAX_RUNNING_APPS, WARNING);
|
||||||
|
|
||||||
ruleHandler = new FSConfigToCSConfigRuleHandler(rules);
|
ruleHandler = new FSConfigToCSConfigRuleHandler(rules,
|
||||||
|
createDefaultConversionOptions());
|
||||||
|
|
||||||
ruleHandler.handleDynamicMaxAssign();
|
ruleHandler.handleDynamicMaxAssign();
|
||||||
ruleHandler.handleMaxCapacityPercentage("test");
|
ruleHandler.handleMaxCapacityPercentage("test");
|
||||||
@ -94,7 +113,8 @@ public void testAllRulesAbort() throws IOException {
|
|||||||
rules.put(USER_MAX_RUNNING_APPS, ABORT);
|
rules.put(USER_MAX_RUNNING_APPS, ABORT);
|
||||||
rules.put(MAX_CHILD_QUEUE_LIMIT, "1");
|
rules.put(MAX_CHILD_QUEUE_LIMIT, "1");
|
||||||
|
|
||||||
ruleHandler = new FSConfigToCSConfigRuleHandler(rules);
|
ruleHandler = new FSConfigToCSConfigRuleHandler(rules,
|
||||||
|
createDefaultConversionOptions());
|
||||||
|
|
||||||
expectAbort(() -> ruleHandler.handleChildQueueCount("test", 2),
|
expectAbort(() -> ruleHandler.handleChildQueueCount("test", 2),
|
||||||
ConversionException.class);
|
ConversionException.class);
|
||||||
@ -113,11 +133,46 @@ public void testMaxChildQueueCountNotInteger() throws IOException {
|
|||||||
Properties rules = new Properties();
|
Properties rules = new Properties();
|
||||||
rules.put(MAX_CHILD_QUEUE_LIMIT, "abc");
|
rules.put(MAX_CHILD_QUEUE_LIMIT, "abc");
|
||||||
|
|
||||||
ruleHandler = new FSConfigToCSConfigRuleHandler(rules);
|
ruleHandler = new FSConfigToCSConfigRuleHandler(rules,
|
||||||
|
createDefaultConversionOptions());
|
||||||
|
|
||||||
ruleHandler.handleChildQueueCount("test", 1);
|
ruleHandler.handleChildQueueCount("test", 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDryRunWarning() {
|
||||||
|
Properties rules = new Properties();
|
||||||
|
|
||||||
|
ruleHandler = new FSConfigToCSConfigRuleHandler(rules,
|
||||||
|
createDryRunConversionOptions());
|
||||||
|
|
||||||
|
ruleHandler.handleDynamicMaxAssign();
|
||||||
|
ruleHandler.handleMaxChildCapacity();
|
||||||
|
|
||||||
|
assertEquals("Number of warnings", 2,
|
||||||
|
dryRunResultHolder.getWarnings().size());
|
||||||
|
assertEquals("Number of errors", 0,
|
||||||
|
dryRunResultHolder.getErrors().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDryRunError() {
|
||||||
|
Properties rules = new Properties();
|
||||||
|
rules.put(DYNAMIC_MAX_ASSIGN, ABORT);
|
||||||
|
rules.put(MAX_CHILD_CAPACITY, ABORT);
|
||||||
|
|
||||||
|
ruleHandler = new FSConfigToCSConfigRuleHandler(rules,
|
||||||
|
createDryRunConversionOptions());
|
||||||
|
|
||||||
|
ruleHandler.handleDynamicMaxAssign();
|
||||||
|
ruleHandler.handleMaxChildCapacity();
|
||||||
|
|
||||||
|
assertEquals("Number of warnings", 0,
|
||||||
|
dryRunResultHolder.getWarnings().size());
|
||||||
|
assertEquals("Number of errors", 2,
|
||||||
|
dryRunResultHolder.getErrors().size());
|
||||||
|
}
|
||||||
|
|
||||||
private void expectAbort(VoidCall call) {
|
private void expectAbort(VoidCall call) {
|
||||||
expectAbort(call, UnsupportedPropertyException.class);
|
expectAbort(call, UnsupportedPropertyException.class);
|
||||||
}
|
}
|
||||||
|
@ -53,6 +53,8 @@
|
|||||||
*/
|
*/
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
public class TestFSQueueConverter {
|
public class TestFSQueueConverter {
|
||||||
|
private static final float MAX_AM_SHARE_DEFAULT = 0.16f;
|
||||||
|
private static final int MAX_APPS_DEFAULT = 15;
|
||||||
private static final Resource CLUSTER_RESOURCE =
|
private static final Resource CLUSTER_RESOURCE =
|
||||||
Resource.newInstance(16384, 16);
|
Resource.newInstance(16384, 16);
|
||||||
private final static Set<String> ALL_QUEUES =
|
private final static Set<String> ALL_QUEUES =
|
||||||
@ -78,6 +80,9 @@ private static String prepareFileName(String f) {
|
|||||||
private Configuration csConfig;
|
private Configuration csConfig;
|
||||||
private FairScheduler fs;
|
private FairScheduler fs;
|
||||||
private FSQueue rootQueue;
|
private FSQueue rootQueue;
|
||||||
|
private ConversionOptions conversionOptions;
|
||||||
|
private DryRunResultHolder dryRunResultHolder;
|
||||||
|
private FSQueueConverterBuilder builder;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private FSConfigToCSConfigRuleHandler ruleHandler;
|
private FSConfigToCSConfigRuleHandler ruleHandler;
|
||||||
@ -91,10 +96,13 @@ public void setup() {
|
|||||||
config.set(FairSchedulerConfiguration.ALLOCATION_FILE, FAIR_SCHEDULER_XML);
|
config.set(FairSchedulerConfiguration.ALLOCATION_FILE, FAIR_SCHEDULER_XML);
|
||||||
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
|
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
|
||||||
csConfig = new Configuration(false);
|
csConfig = new Configuration(false);
|
||||||
|
dryRunResultHolder = new DryRunResultHolder();
|
||||||
|
conversionOptions =
|
||||||
|
new ConversionOptions(dryRunResultHolder, false);
|
||||||
|
|
||||||
fs = createFairScheduler();
|
fs = createFairScheduler();
|
||||||
|
createBuilder();
|
||||||
|
|
||||||
createConverter();
|
|
||||||
rootQueue = fs.getQueueManager().getRootQueue();
|
rootQueue = fs.getQueueManager().getRootQueue();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,19 +125,29 @@ private FairScheduler createFairScheduler() {
|
|||||||
return fairScheduler;
|
return fairScheduler;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createConverter() {
|
private void createBuilder() {
|
||||||
converter = new FSQueueConverter(ruleHandler,
|
builder = FSQueueConverterBuilder.create()
|
||||||
csConfig,
|
.withRuleHandler(ruleHandler)
|
||||||
false,
|
.withCapacitySchedulerConfig(csConfig)
|
||||||
false,
|
.withPreemptionEnabled(false)
|
||||||
false,
|
.withSizeBasedWeight(false)
|
||||||
CLUSTER_RESOURCE,
|
.withAutoCreateChildQueues(false)
|
||||||
0.16f,
|
.withClusterResource(CLUSTER_RESOURCE)
|
||||||
15);
|
.withQueueMaxAMShareDefault(MAX_AM_SHARE_DEFAULT)
|
||||||
|
.withQueueMaxAppsDefault(MAX_APPS_DEFAULT)
|
||||||
|
.withConversionOptions(conversionOptions);
|
||||||
|
}
|
||||||
|
|
||||||
|
private FSQueueConverter prepareDryRunConverter() {
|
||||||
|
conversionOptions.setDryRun(true);
|
||||||
|
converter = builder.withConversionOptions(conversionOptions).build();
|
||||||
|
return converter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConvertQueueHierarchy() {
|
public void testConvertQueueHierarchy() {
|
||||||
|
converter = builder.build();
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
// root children
|
// root children
|
||||||
@ -159,6 +177,7 @@ public void testConvertQueueHierarchy() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConvertQueueHierarchyWithSameLeafQueues() throws Exception {
|
public void testConvertQueueHierarchyWithSameLeafQueues() throws Exception {
|
||||||
|
converter = builder.build();
|
||||||
expectedException.expect(ConversionException.class);
|
expectedException.expect(ConversionException.class);
|
||||||
expectedException.expectMessage("Leaf queues must be unique");
|
expectedException.expectMessage("Leaf queues must be unique");
|
||||||
|
|
||||||
@ -176,6 +195,8 @@ public void testConvertQueueHierarchyWithSameLeafQueues() throws Exception {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueMaxAMShare() {
|
public void testQueueMaxAMShare() {
|
||||||
|
converter = builder.build();
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
// root.admins.bob
|
// root.admins.bob
|
||||||
@ -195,6 +216,8 @@ public void testQueueMaxAMShare() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueMaxRunningApps() {
|
public void testQueueMaxRunningApps() {
|
||||||
|
converter = builder.build();
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertEquals("root.admins.alice max apps", 2,
|
assertEquals("root.admins.alice max apps", 2,
|
||||||
@ -208,6 +231,8 @@ public void testQueueMaxRunningApps() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueMaxAllocations() {
|
public void testQueueMaxAllocations() {
|
||||||
|
converter = builder.build();
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
// root.admins vcores + mb
|
// root.admins vcores + mb
|
||||||
@ -231,14 +256,7 @@ public void testQueueMaxAllocations() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueuePreemptionDisabled() {
|
public void testQueuePreemptionDisabled() {
|
||||||
converter = new FSQueueConverter(ruleHandler,
|
converter = builder.withPreemptionEnabled(true).build();
|
||||||
csConfig,
|
|
||||||
true,
|
|
||||||
false,
|
|
||||||
false,
|
|
||||||
CLUSTER_RESOURCE,
|
|
||||||
0.16f,
|
|
||||||
15);
|
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
@ -256,6 +274,8 @@ public void testQueuePreemptionDisabled() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueuePreemptionDisabledWhenGlobalPreemptionDisabled() {
|
public void testQueuePreemptionDisabledWhenGlobalPreemptionDisabled() {
|
||||||
|
converter = builder.build();
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertNoValueForQueues(ALL_QUEUES, ".disable_preemption", csConfig);
|
assertNoValueForQueues(ALL_QUEUES, ".disable_preemption", csConfig);
|
||||||
@ -263,6 +283,8 @@ public void testQueuePreemptionDisabledWhenGlobalPreemptionDisabled() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testChildCapacity() {
|
public void testChildCapacity() {
|
||||||
|
converter = builder.build();
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
// root
|
// root
|
||||||
@ -288,6 +310,8 @@ public void testChildCapacity() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueMaximumCapacity() {
|
public void testQueueMaximumCapacity() {
|
||||||
|
converter = builder.build();
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertEquals("root.users.joe maximum capacity", "[memory=8192, vcores=8]",
|
assertEquals("root.users.joe maximum capacity", "[memory=8192, vcores=8]",
|
||||||
@ -308,14 +332,10 @@ public void testQueueMaximumCapacity() {
|
|||||||
@Test
|
@Test
|
||||||
public void testQueueAutoCreateChildQueue() {
|
public void testQueueAutoCreateChildQueue() {
|
||||||
config.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, true);
|
config.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, true);
|
||||||
converter = new FSQueueConverter(ruleHandler,
|
converter = builder
|
||||||
csConfig,
|
.withCapacitySchedulerConfig(csConfig)
|
||||||
false,
|
.withAutoCreateChildQueues(true)
|
||||||
false,
|
.build();
|
||||||
true,
|
|
||||||
CLUSTER_RESOURCE,
|
|
||||||
0.16f,
|
|
||||||
15);
|
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
@ -325,14 +345,7 @@ public void testQueueAutoCreateChildQueue() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueSizeBasedWeightEnabled() {
|
public void testQueueSizeBasedWeightEnabled() {
|
||||||
converter = new FSQueueConverter(ruleHandler,
|
converter = builder.withSizeBasedWeight(true).build();
|
||||||
csConfig,
|
|
||||||
false,
|
|
||||||
true,
|
|
||||||
false,
|
|
||||||
CLUSTER_RESOURCE,
|
|
||||||
0.16f,
|
|
||||||
15);
|
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
@ -342,6 +355,8 @@ public void testQueueSizeBasedWeightEnabled() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueSizeBasedWeightDisabled() {
|
public void testQueueSizeBasedWeightDisabled() {
|
||||||
|
converter = builder.build();
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertNoValueForQueues(ALL_QUEUES,
|
assertNoValueForQueues(ALL_QUEUES,
|
||||||
@ -350,6 +365,7 @@ public void testQueueSizeBasedWeightDisabled() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueOrderingPolicy() throws Exception {
|
public void testQueueOrderingPolicy() throws Exception {
|
||||||
|
converter = builder.build();
|
||||||
String absolutePath =
|
String absolutePath =
|
||||||
new File("src/test/resources/fair-scheduler-orderingpolicy.xml")
|
new File("src/test/resources/fair-scheduler-orderingpolicy.xml")
|
||||||
.getAbsolutePath();
|
.getAbsolutePath();
|
||||||
@ -386,6 +402,7 @@ public void testQueueOrderingPolicy() throws Exception {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testQueueMaxChildCapacityNotSupported() {
|
public void testQueueMaxChildCapacityNotSupported() {
|
||||||
|
converter = builder.build();
|
||||||
expectedException.expect(UnsupportedPropertyException.class);
|
expectedException.expect(UnsupportedPropertyException.class);
|
||||||
expectedException.expectMessage("test");
|
expectedException.expectMessage("test");
|
||||||
|
|
||||||
@ -397,6 +414,7 @@ public void testQueueMaxChildCapacityNotSupported() {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReservationSystemNotSupported() {
|
public void testReservationSystemNotSupported() {
|
||||||
|
converter = builder.build();
|
||||||
expectedException.expect(UnsupportedPropertyException.class);
|
expectedException.expect(UnsupportedPropertyException.class);
|
||||||
expectedException.expectMessage("maxCapacity");
|
expectedException.expectMessage("maxCapacity");
|
||||||
|
|
||||||
@ -407,6 +425,45 @@ public void testReservationSystemNotSupported() {
|
|||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDryRunWithMultipleLeafQueueNames() throws IOException {
|
||||||
|
String absolutePath =
|
||||||
|
new File("src/test/resources/fair-scheduler-sameleafqueue.xml")
|
||||||
|
.getAbsolutePath();
|
||||||
|
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||||
|
FILE_PREFIX + absolutePath);
|
||||||
|
fs.close();
|
||||||
|
fs = createFairScheduler();
|
||||||
|
rootQueue = fs.getQueueManager().getRootQueue();
|
||||||
|
|
||||||
|
prepareDryRunConverter();
|
||||||
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
|
assertEquals("Dry run errors", 1, dryRunResultHolder.getErrors().size());
|
||||||
|
assertEquals("Dry run warnings", 0,
|
||||||
|
dryRunResultHolder.getWarnings().size());
|
||||||
|
String error = dryRunResultHolder.getErrors().iterator().next();
|
||||||
|
assertTrue("Unexpected error message",
|
||||||
|
error.contains("Leaf queues must be unique"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDryRunWithNoClusterResource() {
|
||||||
|
builder.withClusterResource(null);
|
||||||
|
prepareDryRunConverter();
|
||||||
|
|
||||||
|
rootQueue = fs.getQueueManager().getRootQueue();
|
||||||
|
|
||||||
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
|
assertEquals("Dry run errors", 1, dryRunResultHolder.getErrors().size());
|
||||||
|
assertEquals("Dry run warnings", 0,
|
||||||
|
dryRunResultHolder.getWarnings().size());
|
||||||
|
String error = dryRunResultHolder.getErrors().iterator().next();
|
||||||
|
assertTrue("Unexpected error message",
|
||||||
|
error.contains("<maxResources> defined in percentages"));
|
||||||
|
}
|
||||||
|
|
||||||
private void assertNoValueForQueues(Set<String> queues, String postfix,
|
private void assertNoValueForQueues(Set<String> queues, String postfix,
|
||||||
Configuration config) {
|
Configuration config) {
|
||||||
for (String queue : queues) {
|
for (String queue : queues) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user