YARN-10316. FS-CS converter: convert maxAppsDefault, maxRunningApps settings. Contributed by Peter Bacsko
This commit is contained in:
parent
fa14e4bc00
commit
03f855e3e7
|
@ -83,6 +83,9 @@ public class FSConfigToCSConfigConverter {
|
|||
private boolean preemptionEnabled = false;
|
||||
private int queueMaxAppsDefault;
|
||||
private float queueMaxAMShareDefault;
|
||||
private Map<String, Integer> userMaxApps;
|
||||
private int userMaxAppsDefault;
|
||||
|
||||
private boolean autoCreateChildQueues = false;
|
||||
private boolean sizeBasedWeight = false;
|
||||
private boolean userAsDefaultQueue = false;
|
||||
|
@ -99,6 +102,8 @@ public class FSConfigToCSConfigConverter {
|
|||
private boolean consoleMode = false;
|
||||
private boolean convertPlacementRules = false;
|
||||
|
||||
|
||||
|
||||
public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
|
||||
ruleHandler, ConversionOptions conversionOptions) {
|
||||
this.ruleHandler = ruleHandler;
|
||||
|
@ -242,14 +247,13 @@ public class FSConfigToCSConfigConverter {
|
|||
|
||||
AllocationConfiguration allocConf = fs.getAllocationConfiguration();
|
||||
queueMaxAppsDefault = allocConf.getQueueMaxAppsDefault();
|
||||
userMaxAppsDefault = allocConf.getUserMaxAppsDefault();
|
||||
userMaxApps = allocConf.getUserMaxApps();
|
||||
queueMaxAMShareDefault = allocConf.getQueueMaxAMShareDefault();
|
||||
|
||||
convertedYarnSiteConfig = new Configuration(false);
|
||||
capacitySchedulerConfig = new Configuration(false);
|
||||
|
||||
checkUserMaxApps(allocConf);
|
||||
checkUserMaxAppsDefault(allocConf);
|
||||
|
||||
convertYarnSiteXml(inputYarnSiteConfig, havePlacementPolicies);
|
||||
convertCapacitySchedulerXml(fs);
|
||||
|
||||
|
@ -287,7 +291,9 @@ public class FSConfigToCSConfigConverter {
|
|||
|
||||
private void convertCapacitySchedulerXml(FairScheduler fs) {
|
||||
FSParentQueue rootQueue = fs.getQueueManager().getRootQueue();
|
||||
emitDefaultMaxApplications();
|
||||
emitDefaultQueueMaxParallelApplications();
|
||||
emitDefaultUserMaxParallelApplications();
|
||||
emitUserMaxParallelApplications();
|
||||
emitDefaultMaxAMShare();
|
||||
|
||||
FSQueueConverter queueConverter = FSQueueConverterBuilder.create()
|
||||
|
@ -322,14 +328,30 @@ public class FSConfigToCSConfigConverter {
|
|||
}
|
||||
}
|
||||
|
||||
private void emitDefaultMaxApplications() {
|
||||
private void emitDefaultQueueMaxParallelApplications() {
|
||||
if (queueMaxAppsDefault != Integer.MAX_VALUE) {
|
||||
capacitySchedulerConfig.set(
|
||||
CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS,
|
||||
PREFIX + "max-parallel-apps",
|
||||
String.valueOf(queueMaxAppsDefault));
|
||||
}
|
||||
}
|
||||
|
||||
private void emitDefaultUserMaxParallelApplications() {
|
||||
if (userMaxAppsDefault != Integer.MAX_VALUE) {
|
||||
capacitySchedulerConfig.set(
|
||||
PREFIX + "user.max-parallel-apps",
|
||||
String.valueOf(userMaxAppsDefault));
|
||||
}
|
||||
}
|
||||
|
||||
private void emitUserMaxParallelApplications() {
|
||||
userMaxApps
|
||||
.forEach((user, apps) -> {
|
||||
capacitySchedulerConfig.setInt(
|
||||
PREFIX + "user." + user + ".max-parallel-apps", apps);
|
||||
});
|
||||
}
|
||||
|
||||
private void emitDefaultMaxAMShare() {
|
||||
if (queueMaxAMShareDefault == QUEUE_MAX_AM_SHARE_DISABLED) {
|
||||
capacitySchedulerConfig.setFloat(
|
||||
|
@ -374,19 +396,6 @@ public class FSConfigToCSConfigConverter {
|
|||
}
|
||||
}
|
||||
|
||||
private void checkUserMaxApps(AllocationConfiguration allocConf) {
|
||||
if (allocConf.getUserMaxApps() != null
|
||||
&& allocConf.getUserMaxApps().size() > 0) {
|
||||
ruleHandler.handleUserMaxApps();
|
||||
}
|
||||
}
|
||||
|
||||
private void checkUserMaxAppsDefault(AllocationConfiguration allocConf) {
|
||||
if (allocConf.getUserMaxAppsDefault() > 0) {
|
||||
ruleHandler.handleUserMaxAppsDefault();
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isDrfUsed(FairScheduler fs) {
|
||||
FSQueue rootQueue = fs.getQueueManager().getRootQueue();
|
||||
AllocationConfiguration allocConf = fs.getAllocationConfiguration();
|
||||
|
|
|
@ -170,14 +170,6 @@ public class FSConfigToCSConfigRuleHandler {
|
|||
}
|
||||
}
|
||||
|
||||
public void handleUserMaxApps() {
|
||||
handle(USER_MAX_RUNNING_APPS, "<maxRunningApps>", null);
|
||||
}
|
||||
|
||||
public void handleUserMaxAppsDefault() {
|
||||
handle(USER_MAX_APPS_DEFAULT, "<userMaxAppsDefault>", null);
|
||||
}
|
||||
|
||||
public void handleDynamicMaxAssign() {
|
||||
handle(DYNAMIC_MAX_ASSIGN,
|
||||
FairSchedulerConfiguration.DYNAMIC_MAX_ASSIGN, null);
|
||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||
*/
|
||||
public class FSQueueConverter {
|
||||
public static final float QUEUE_MAX_AM_SHARE_DISABLED = -1.0f;
|
||||
private static final int MAX_RUNNING_APPS_UNSET = Integer.MIN_VALUE;
|
||||
private static final int MAX_RUNNING_APPS_UNSET = Integer.MAX_VALUE;
|
||||
private static final String FAIR_POLICY = "fair";
|
||||
private static final String FIFO_POLICY = "fifo";
|
||||
|
||||
|
@ -79,7 +79,7 @@ public class FSQueueConverter {
|
|||
|
||||
emitChildQueues(queueName, children);
|
||||
emitMaxAMShare(queueName, queue);
|
||||
emitMaxRunningApps(queueName, queue);
|
||||
emitMaxParallelApps(queueName, queue);
|
||||
emitMaxAllocations(queueName, queue);
|
||||
emitPreemptionDisabled(queueName, queue);
|
||||
|
||||
|
@ -138,14 +138,14 @@ public class FSQueueConverter {
|
|||
|
||||
/**
|
||||
* <maxRunningApps>
|
||||
* ==> yarn.scheduler.capacity.<queue-name>.maximum-applications.
|
||||
* ==> yarn.scheduler.capacity.<queue-name>.max-parallel-apps.
|
||||
* @param queueName
|
||||
* @param queue
|
||||
*/
|
||||
private void emitMaxRunningApps(String queueName, FSQueue queue) {
|
||||
private void emitMaxParallelApps(String queueName, FSQueue queue) {
|
||||
if (queue.getMaxRunningApps() != MAX_RUNNING_APPS_UNSET
|
||||
&& queue.getMaxRunningApps() != queueMaxAppsDefault) {
|
||||
capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-applications",
|
||||
capacitySchedulerConfig.set(PREFIX + queueName + ".max-parallel-apps",
|
||||
String.valueOf(queue.getMaxRunningApps()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -155,17 +155,7 @@ public class TestFSConfigToCSConfigConverter {
|
|||
.withOutputDirectory(FSConfigConverterTestCommons.OUTPUT_DIR);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultMaxApplications() throws Exception {
|
||||
converter.convert(config);
|
||||
|
||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
||||
int maxApps =
|
||||
conf.getInt(
|
||||
CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, -1);
|
||||
|
||||
assertEquals("Default max apps", 15, maxApps);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultMaxAMShare() throws Exception {
|
||||
|
@ -252,14 +242,52 @@ public class TestFSConfigToCSConfigConverter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultMaxRunningApps() throws Exception {
|
||||
public void testDefaultQueueMaxParallelApps() throws Exception {
|
||||
converter.convert(config);
|
||||
|
||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
||||
|
||||
// default setting
|
||||
assertEquals("Default max apps", 15,
|
||||
conf.getInt(PREFIX + "maximum-applications", -1));
|
||||
assertEquals("Default max parallel apps", 15,
|
||||
conf.getInt(PREFIX + "max-parallel-apps", -1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpecificQueueMaxParallelApps() throws Exception {
|
||||
converter.convert(config);
|
||||
|
||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
||||
|
||||
assertEquals("root.admins.alice max parallel apps", 2,
|
||||
conf.getInt(PREFIX + "root.admins.alice.max-parallel-apps", -1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDefaultUserMaxParallelApps() throws Exception {
|
||||
converter.convert(config);
|
||||
|
||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
||||
int userMaxParallelApps =
|
||||
conf.getInt(
|
||||
PREFIX + "user.max-parallel-apps", -1);
|
||||
|
||||
assertEquals("Default user max parallel apps", 10,
|
||||
userMaxParallelApps);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpecificUserMaxParallelApps() throws Exception {
|
||||
converter.convert(config);
|
||||
|
||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
||||
|
||||
assertEquals("Max parallel apps for alice", 30,
|
||||
conf.getInt(PREFIX + "user.alice.max-parallel-apps", -1));
|
||||
assertNull("Max parallel apps should be undefined for user bob",
|
||||
conf.get(PREFIX + "user.bob.max-parallel-apps"));
|
||||
assertNull("Max parallel apps should be undefined for user joe",
|
||||
conf.get(PREFIX + "user.joe.max-parallel-apps"));
|
||||
assertNull("Max parallel apps should be undefined for user john",
|
||||
conf.get(PREFIX + "user.john.max-parallel-apps"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -285,28 +313,6 @@ public class TestFSConfigToCSConfigConverter {
|
|||
converter.convert(config);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserMaxAppsNotSupported() throws Exception {
|
||||
expectedException.expect(UnsupportedPropertyException.class);
|
||||
expectedException.expectMessage("userMaxApps");
|
||||
|
||||
Mockito.doThrow(new UnsupportedPropertyException("userMaxApps"))
|
||||
.when(ruleHandler).handleUserMaxApps();
|
||||
|
||||
converter.convert(config);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserMaxAppsDefaultNotSupported() throws Exception {
|
||||
expectedException.expect(UnsupportedPropertyException.class);
|
||||
expectedException.expectMessage("userMaxAppsDefault");
|
||||
|
||||
Mockito.doThrow(new UnsupportedPropertyException("userMaxAppsDefault"))
|
||||
.when(ruleHandler).handleUserMaxAppsDefault();
|
||||
|
||||
converter.convert(config);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertFSConfigurationClusterResource() throws Exception {
|
||||
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
|
||||
|
|
|
@ -76,8 +76,6 @@ public class TestFSConfigToCSConfigRuleHandler {
|
|||
ruleHandler.handleQueueAutoCreate("test");
|
||||
ruleHandler.handleReservationSystem();
|
||||
ruleHandler.handleSpecifiedNotFirstRule();
|
||||
ruleHandler.handleUserMaxApps();
|
||||
ruleHandler.handleUserMaxAppsDefault();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -106,8 +104,6 @@ public class TestFSConfigToCSConfigRuleHandler {
|
|||
ruleHandler.handleQueueAutoCreate("test");
|
||||
ruleHandler.handleReservationSystem();
|
||||
ruleHandler.handleSpecifiedNotFirstRule();
|
||||
ruleHandler.handleUserMaxApps();
|
||||
ruleHandler.handleUserMaxAppsDefault();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -140,8 +136,6 @@ public class TestFSConfigToCSConfigRuleHandler {
|
|||
expectAbort(() -> ruleHandler.handleQueueAutoCreate("test"));
|
||||
expectAbort(() -> ruleHandler.handleReservationSystem());
|
||||
expectAbort(() -> ruleHandler.handleSpecifiedNotFirstRule());
|
||||
expectAbort(() -> ruleHandler.handleUserMaxApps());
|
||||
expectAbort(() -> ruleHandler.handleUserMaxAppsDefault());
|
||||
expectAbort(() -> ruleHandler.handleFairAsDrf("test"));
|
||||
}
|
||||
|
||||
|
|
|
@ -195,18 +195,18 @@ public class TestFSQueueConverter {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testQueueMaxRunningApps() {
|
||||
public void testQueueMaxParallelApps() {
|
||||
converter = builder.build();
|
||||
|
||||
converter.convertQueueHierarchy(rootQueue);
|
||||
|
||||
assertEquals("root.admins.alice max apps", 2,
|
||||
csConfig.getInt(PREFIX + "root.admins.alice.maximum-applications",
|
||||
csConfig.getInt(PREFIX + "root.admins.alice.max-parallel-apps",
|
||||
-1));
|
||||
|
||||
Set<String> remaining = Sets.difference(ALL_QUEUES,
|
||||
Sets.newHashSet("root.admins.alice"));
|
||||
assertNoValueForQueues(remaining, ".maximum-applications", csConfig);
|
||||
assertNoValueForQueues(remaining, ".max-parallel-apps", csConfig);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue