YARN-10657. We should make max application per queue to support node label. Contributed by Andras Gyori.

This commit is contained in:
zhuqi-lucas 2021-07-22 20:30:43 +08:00
parent dbd255f4a9
commit 2da9b95d4d
6 changed files with 77 additions and 34 deletions

View File

@ -1528,21 +1528,30 @@ public abstract class AbstractCSQueue implements CSQueue {
}
void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf,
LeafQueue leafQueue, String label) {
LeafQueue leafQueue) {
int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath);
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
String maxLabel = RMNodeLabelsManager.NO_LABEL;
if (maxApplications < 0) {
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
if (maxGlobalPerQueueApps > 0) {
// In absolute mode, should
// shrink when change to corresponding label capacity.
maxApplications = this.capacityConfigType
!= CapacityConfigType.ABSOLUTE_RESOURCE ?
maxGlobalPerQueueApps :
(int) (maxGlobalPerQueueApps * queueCapacities
.getAbsoluteCapacity(label));
} else{
maxApplications = (int) (conf.getMaximumSystemApplications()
* queueCapacities.getAbsoluteCapacity(label));
for (String label : configuredNodeLabels) {
int maxApplicationsByLabel = 0;
if (maxGlobalPerQueueApps > 0) {
// In absolute mode, should
// shrink when change to corresponding label capacity.
maxApplicationsByLabel = this.capacityConfigType
!= CapacityConfigType.ABSOLUTE_RESOURCE ?
maxGlobalPerQueueApps :
(int) (maxGlobalPerQueueApps * queueCapacities
.getAbsoluteCapacity(label));
} else {
maxApplicationsByLabel = (int) (conf.getMaximumSystemApplications()
* queueCapacities.getAbsoluteCapacity(label));
}
if (maxApplicationsByLabel > maxApplications) {
maxApplications = maxApplicationsByLabel;
maxLabel = label;
}
}
}
leafQueue.setMaxApplications(maxApplications);
@ -1560,9 +1569,9 @@ public abstract class AbstractCSQueue implements CSQueue {
"update max app related, maxApplications="
+ maxApplications + ", maxApplicationsPerUser="
+ maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
.getAbsoluteCapacity(label) + ", Cap: " + queueCapacities
.getCapacity(label) + ", MaxCap : " + queueCapacities
.getMaximumCapacity(label));
.getAbsoluteCapacity(maxLabel) + ", Cap: " + queueCapacities
.getCapacity(maxLabel) + ", MaxCap : " + queueCapacities
.getMaximumCapacity(maxLabel));
}
private void deriveCapacityFromAbsoluteConfigurations(String label,
@ -1643,11 +1652,6 @@ public abstract class AbstractCSQueue implements CSQueue {
deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc);
// Re-visit max applications for a queue based on absolute capacity if
// needed.
if (this instanceof LeafQueue) {
LeafQueue leafQueue = (LeafQueue) this;
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
updateMaxAppRelatedField(conf, leafQueue, label);
}
} else{
queueResourceQuotas.setEffectiveMinResource(label, Resources
.multiply(resourceByLabel,

View File

@ -632,6 +632,11 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
return configuredNodeLabels;
}
@VisibleForTesting
public void reinitConfiguredNodeLabels(CapacitySchedulerConfiguration conf) {
this.configuredNodeLabels = new ConfiguredNodeLabels(conf);
}
private LeafQueue createAutoQueue(ApplicationPlacementContext queue)
throws SchedulerDynamicEditException {
List<String> parentsToCreate = determineMissingParents(queue);

View File

@ -1945,14 +1945,9 @@ public class LeafQueue extends AbstractCSQueue {
updateAbsoluteCapacities();
// If maxApplications not set, use the system total max app, apply newly
// calculated abs capacity of the queue.
// When add new queue, the parent queue's other children should also
// update the max app.
super.updateMaxAppRelatedField(csContext.getConfiguration(),
this, CommonNodeLabelsManager.NO_LABEL);
super.updateEffectiveResources(clusterResource);
super.updateMaxAppRelatedField(csContext.getConfiguration(),
this);
updateCurrentResourceLimits(currentResourceLimits, clusterResource);

View File

@ -141,7 +141,7 @@ public class TestCapacitySchedulerAutoQueueCreation
validateInitialQueueEntitlement(parentQueue, USER0,
expectedChildQueueAbsCapacity, accessibleNodeLabelsOnC);
validateUserAndAppLimits(autoCreatedLeafQueue, 1000, 1000);
validateUserAndAppLimits(autoCreatedLeafQueue, 4000, 4000);
validateContainerLimits(autoCreatedLeafQueue);
assertTrue(autoCreatedLeafQueue
@ -911,7 +911,7 @@ public class TestCapacitySchedulerAutoQueueCreation
AutoCreatedLeafQueue user0Queue = (AutoCreatedLeafQueue) newCS.getQueue(
USER1);
validateCapacities(user0Queue, 0.5f, 0.15f, 1.0f, 0.5f);
validateUserAndAppLimits(user0Queue, 1500, 1500);
validateUserAndAppLimits(user0Queue, 4000, 4000);
//update leaf queue template capacities
conf.setAutoCreatedLeafQueueConfigCapacity(C, 30f);
@ -919,7 +919,7 @@ public class TestCapacitySchedulerAutoQueueCreation
newCS.reinitialize(conf, newMockRM.getRMContext());
validateCapacities(user0Queue, 0.3f, 0.09f, 0.4f, 0.2f);
validateUserAndAppLimits(user0Queue, 900, 900);
validateUserAndAppLimits(user0Queue, 4000, 4000);
//submit app1 as USER3
submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1);
@ -927,7 +927,7 @@ public class TestCapacitySchedulerAutoQueueCreation
(AutoCreatedLeafQueue) newCS.getQueue(USER1);
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f);
validateUserAndAppLimits(user3Queue, 900, 900);
validateUserAndAppLimits(user3Queue, 4000, 4000);
//submit app1 as USER1 - is already activated. there should be no diff
// in capacities
@ -935,7 +935,7 @@ public class TestCapacitySchedulerAutoQueueCreation
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f,0.2f);
validateUserAndAppLimits(user3Queue, 900, 900);
validateUserAndAppLimits(user3Queue, 4000, 4000);
validateContainerLimits(user3Queue);
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =

View File

@ -5120,6 +5120,45 @@ public class TestLeafQueue {
return queue;
}
@Test
public void testMaxApplicationsWithNodeLabels() throws IOException {
CapacitySchedulerConfiguration conf = csConf;
String rootChild = root.getChildQueues().get(0).getQueuePath();
conf.setCapacityByLabel(ROOT, "test", 100);
conf.setCapacityByLabel(rootChild, "test", 100);
conf.setCapacityByLabel(rootChild + "." + A, "test", 20);
conf.setCapacityByLabel(rootChild + "." + B, "test", 40);
conf.setCapacityByLabel(rootChild + "." + C, "test", 10);
conf.setCapacityByLabel(rootChild + "." + C + "." + C1, "test", 100);
conf.setCapacityByLabel(rootChild + "." + D, "test", 30);
conf.setCapacityByLabel(rootChild + "." + E, "test", 0);
cs.getCapacitySchedulerQueueManager().reinitConfiguredNodeLabels(conf);
cs.setMaxRunningAppsEnforcer(new CSMaxRunningAppsEnforcer(cs));
cs.reinitialize(conf, cs.getRMContext());
LeafQueue e = (LeafQueue) cs.getQueue("e");
// Maximum application should be calculated with the default node label
Assert.assertEquals("Maximum application is not calculated properly",
(int)(conf.getMaximumSystemApplications()
* e.getAbsoluteCapacity()), e.getMaxApplications());
conf.setCapacityByLabel(rootChild + "." + A, "test", 10);
conf.setCapacityByLabel(rootChild + "." + B, "test", 10);
conf.setCapacityByLabel(rootChild + "." + C, "test", 10);
conf.setCapacityByLabel(rootChild + "." + D, "test", 10);
conf.setCapacityByLabel(rootChild + "." + E, "test", 60);
cs.reinitialize(conf, cs.getRMContext());
e = (LeafQueue) cs.getQueue("e");
// Maximum application is now determined by test label, because that would
// yield a higher value than with default node label
Assert.assertEquals("Maximum application is not calculated properly",
(int)(conf.getMaximumSystemApplications() *
e.getQueueCapacities().getAbsoluteCapacity("test")),
e.getMaxApplications());
}
@After
public void tearDown() throws Exception {
if (cs != null) {

View File

@ -141,7 +141,7 @@ Configuration
| Property | Description |
|:---- |:---- |
| `yarn.scheduler.capacity.maximum-applications` / `yarn.scheduler.capacity.<queue-path>.maximum-applications` | Maximum number of applications in the system which can be concurrently active both running and pending. Limits on each queue are directly proportional to their queue capacities and user limits. This is a hard limit and any applications submitted when this limit is reached will be rejected. Default is 10000. This can be set for all queues with `yarn.scheduler.capacity.maximum-applications` and can also be overridden on a per queue basis by setting `yarn.scheduler.capacity.<queue-path>.maximum-applications`. Integer value expected. |
| `yarn.scheduler.capacity.maximum-applications` / `yarn.scheduler.capacity.<queue-path>.maximum-applications` | Maximum number of applications in the system which can be concurrently active both running and pending. Limits on each queue are directly proportional to their queue capacities and user limits. This is a hard limit and any applications submitted when this limit is reached will be rejected. Default is 10000. This can be set for all queues with `yarn.scheduler.capacity.maximum-applications` and can also be overridden on a per queue basis by setting `yarn.scheduler.capacity.<queue-path>.maximum-applications`. When this property is not set for a specific queue path, the maximum application number is calculated by taking all configured node labels into consideration, and choosing the highest possible value. Integer value expected. |
| `yarn.scheduler.capacity.maximum-am-resource-percent` / `yarn.scheduler.capacity.<queue-path>.maximum-am-resource-percent` | Maximum percent of resources in the cluster which can be used to run application masters - controls number of concurrent active applications. Limits on each queue are directly proportional to their queue capacities and user limits. Specified as a float - ie 0.5 = 50%. Default is 10%. This can be set for all queues with `yarn.scheduler.capacity.maximum-am-resource-percent` and can also be overridden on a per queue basis by setting `yarn.scheduler.capacity.<queue-path>.maximum-am-resource-percent` |
| `yarn.scheduler.capacity.max-parallel-apps` / `yarn.scheduler.capacity.<queue-path>.max-parallel-apps` | Maximum number of applications that can run at the same time. Unlike to `maximum-applications`, application submissions are *not* rejected when this limit is reached. Instead they stay in `ACCEPTED` state until they are eligible to run. This can be set for all queues with `yarn.scheduler.capacity.max-parallel-apps` and can also be overridden on a per queue basis by setting `yarn.scheduler.capacity.<queue-path>.max-parallel-apps`. Integer value is expected. By default, there is no limit. |