YARN-10641. Refactor the max app related update, and fix maxApllications update error when add new queues. Contributed by Qi Zhu.
This commit is contained in:
parent
38495af325
commit
ce6bfd5718
@ -1489,6 +1489,44 @@ private Resource getMinResourceNormalized(String name,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf,
|
||||||
|
LeafQueue leafQueue, String label) {
|
||||||
|
int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath);
|
||||||
|
if (maxApplications < 0) {
|
||||||
|
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
|
||||||
|
if (maxGlobalPerQueueApps > 0) {
|
||||||
|
// In absolute mode, should
|
||||||
|
// shrink when change to corresponding label capacity.
|
||||||
|
maxApplications = this.capacityConfigType
|
||||||
|
!= CapacityConfigType.ABSOLUTE_RESOURCE ?
|
||||||
|
maxGlobalPerQueueApps :
|
||||||
|
(int) (maxGlobalPerQueueApps * queueCapacities
|
||||||
|
.getAbsoluteCapacity(label));
|
||||||
|
} else{
|
||||||
|
maxApplications = (int) (conf.getMaximumSystemApplications()
|
||||||
|
* queueCapacities.getAbsoluteCapacity(label));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
leafQueue.setMaxApplications(maxApplications);
|
||||||
|
|
||||||
|
int maxApplicationsPerUser = Math.min(maxApplications,
|
||||||
|
(int) (maxApplications
|
||||||
|
* (leafQueue.getUsersManager().getUserLimit() / 100.0f)
|
||||||
|
* leafQueue.getUsersManager().getUserLimitFactor()));
|
||||||
|
if (leafQueue.getUsersManager().getUserLimitFactor() == -1) {
|
||||||
|
maxApplicationsPerUser = maxApplications;
|
||||||
|
}
|
||||||
|
|
||||||
|
leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser);
|
||||||
|
LOG.info("LeafQueue:" + leafQueue.getQueuePath() +
|
||||||
|
"update max app related, maxApplications="
|
||||||
|
+ maxApplications + ", maxApplicationsPerUser="
|
||||||
|
+ maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
|
||||||
|
.getAbsoluteCapacity(label) + ", Cap: " + queueCapacities
|
||||||
|
.getCapacity(label) + ", MaxCap : " + queueCapacities
|
||||||
|
.getMaximumCapacity(label));
|
||||||
|
}
|
||||||
|
|
||||||
private void deriveCapacityFromAbsoluteConfigurations(String label,
|
private void deriveCapacityFromAbsoluteConfigurations(String label,
|
||||||
Resource clusterResource, ResourceCalculator rc) {
|
Resource clusterResource, ResourceCalculator rc) {
|
||||||
|
|
||||||
@ -1522,42 +1560,6 @@ private void deriveCapacityFromAbsoluteConfigurations(String label,
|
|||||||
queueCapacities.setAbsoluteMaximumCapacity(label,
|
queueCapacities.setAbsoluteMaximumCapacity(label,
|
||||||
queueCapacities.getMaximumCapacity(label) * parent.getQueueCapacities()
|
queueCapacities.getMaximumCapacity(label) * parent.getQueueCapacities()
|
||||||
.getAbsoluteMaximumCapacity(label));
|
.getAbsoluteMaximumCapacity(label));
|
||||||
|
|
||||||
// Re-visit max applications for a queue based on absolute capacity if
|
|
||||||
// needed.
|
|
||||||
if (this instanceof LeafQueue) {
|
|
||||||
LeafQueue leafQueue = (LeafQueue) this;
|
|
||||||
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
|
||||||
int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath);
|
|
||||||
if (maxApplications < 0) {
|
|
||||||
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
|
|
||||||
if (maxGlobalPerQueueApps > 0) {
|
|
||||||
maxApplications = (int) (maxGlobalPerQueueApps * queueCapacities
|
|
||||||
.getAbsoluteCapacity(label));
|
|
||||||
} else{
|
|
||||||
maxApplications =
|
|
||||||
(int) (conf.getMaximumSystemApplications() * queueCapacities
|
|
||||||
.getAbsoluteCapacity(label));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
leafQueue.setMaxApplications(maxApplications);
|
|
||||||
|
|
||||||
int maxApplicationsPerUser = Math.min(maxApplications,
|
|
||||||
(int) (maxApplications
|
|
||||||
* (leafQueue.getUsersManager().getUserLimit() / 100.0f)
|
|
||||||
* leafQueue.getUsersManager().getUserLimitFactor()));
|
|
||||||
if (leafQueue.getUsersManager().getUserLimitFactor() == -1) {
|
|
||||||
maxApplicationsPerUser = maxApplications;
|
|
||||||
}
|
|
||||||
|
|
||||||
leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser);
|
|
||||||
LOG.info("LeafQueue:" + leafQueue.getQueuePath() + ", maxApplications="
|
|
||||||
+ maxApplications + ", maxApplicationsPerUser="
|
|
||||||
+ maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
|
|
||||||
.getAbsoluteCapacity(label) + ", Cap: " + queueCapacities
|
|
||||||
.getCapacity(label) + ", MaxCap : " + queueCapacities
|
|
||||||
.getMaximumCapacity(label));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateEffectiveResources(Resource clusterResource) {
|
void updateEffectiveResources(Resource clusterResource) {
|
||||||
@ -1603,6 +1605,13 @@ void updateEffectiveResources(Resource clusterResource) {
|
|||||||
// percentage, we have to calculate percentage and update.
|
// percentage, we have to calculate percentage and update.
|
||||||
ResourceCalculator rc = this.csContext.getResourceCalculator();
|
ResourceCalculator rc = this.csContext.getResourceCalculator();
|
||||||
deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc);
|
deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc);
|
||||||
|
// Re-visit max applications for a queue based on absolute capacity if
|
||||||
|
// needed.
|
||||||
|
if (this instanceof LeafQueue) {
|
||||||
|
LeafQueue leafQueue = (LeafQueue) this;
|
||||||
|
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
||||||
|
updateMaxAppRelatedField(conf, leafQueue, label);
|
||||||
|
}
|
||||||
} else{
|
} else{
|
||||||
queueResourceQuotas.setEffectiveMinResource(label, Resources
|
queueResourceQuotas.setEffectiveMinResource(label, Resources
|
||||||
.multiply(resourceByLabel,
|
.multiply(resourceByLabel,
|
||||||
|
@ -477,6 +477,13 @@ public int getMaximumApplicationsPerQueue(String queue) {
|
|||||||
return maxApplicationsPerQueue;
|
return maxApplicationsPerQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void setMaximumApplicationsPerQueue(String queue,
|
||||||
|
int numMaxApps) {
|
||||||
|
setInt(getQueuePrefix(queue) + MAXIMUM_APPLICATIONS_SUFFIX,
|
||||||
|
numMaxApps);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the maximum am resource percent per queue setting.
|
* Get the maximum am resource percent per queue setting.
|
||||||
* @param queue name of the queue
|
* @param queue name of the queue
|
||||||
|
@ -87,6 +87,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|||||||
|
|
||||||
private float absoluteUsedCapacity = 0.0f;
|
private float absoluteUsedCapacity = 0.0f;
|
||||||
|
|
||||||
|
// TODO the max applications should consider label
|
||||||
protected int maxApplications;
|
protected int maxApplications;
|
||||||
protected volatile int maxApplicationsPerUser;
|
protected volatile int maxApplicationsPerUser;
|
||||||
|
|
||||||
@ -1915,28 +1916,6 @@ private void updateCurrentResourceLimits(
|
|||||||
currentResourceLimits.getLimit()));
|
currentResourceLimits.getLimit()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateAbsoluteCapacitiesAndRelatedFields() {
|
|
||||||
updateAbsoluteCapacities();
|
|
||||||
CapacitySchedulerConfiguration schedulerConf = csContext.getConfiguration();
|
|
||||||
|
|
||||||
// If maxApplications not set, use the system total max app, apply newly
|
|
||||||
// calculated abs capacity of the queue.
|
|
||||||
if (maxApplications <= 0) {
|
|
||||||
int maxSystemApps = schedulerConf.
|
|
||||||
getMaximumSystemApplications();
|
|
||||||
maxApplications =
|
|
||||||
(int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
|
|
||||||
}
|
|
||||||
maxApplicationsPerUser =
|
|
||||||
Math.min(maxApplications,
|
|
||||||
(int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
|
|
||||||
* usersManager.getUserLimitFactor()));
|
|
||||||
|
|
||||||
if (getUserLimitFactor() == -1) {
|
|
||||||
maxApplicationsPerUser = maxApplications;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void updateClusterResource(Resource clusterResource,
|
public void updateClusterResource(Resource clusterResource,
|
||||||
ResourceLimits currentResourceLimits) {
|
ResourceLimits currentResourceLimits) {
|
||||||
@ -1944,7 +1923,14 @@ public void updateClusterResource(Resource clusterResource,
|
|||||||
try {
|
try {
|
||||||
lastClusterResource = clusterResource;
|
lastClusterResource = clusterResource;
|
||||||
|
|
||||||
updateAbsoluteCapacitiesAndRelatedFields();
|
updateAbsoluteCapacities();
|
||||||
|
|
||||||
|
// If maxApplications not set, use the system total max app, apply newly
|
||||||
|
// calculated abs capacity of the queue.
|
||||||
|
// When add new queue, the parent queue's other children should also
|
||||||
|
// update the max app.
|
||||||
|
super.updateMaxAppRelatedField(csContext.getConfiguration(),
|
||||||
|
this, CommonNodeLabelsManager.NO_LABEL);
|
||||||
|
|
||||||
super.updateEffectiveResources(clusterResource);
|
super.updateEffectiveResources(clusterResource);
|
||||||
|
|
||||||
|
@ -549,6 +549,63 @@ public void testAutoCreateQueueUserLimitDisabled() throws Exception {
|
|||||||
user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6);
|
user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAutoQueueCreationMaxAppUpdate() throws Exception {
|
||||||
|
startScheduler();
|
||||||
|
|
||||||
|
// When no conf for max apps
|
||||||
|
LeafQueue a1 = (LeafQueue)cs.
|
||||||
|
getQueue("root.a.a1");
|
||||||
|
Assert.assertNotNull(a1);
|
||||||
|
Assert.assertEquals(csConf.getMaximumSystemApplications()
|
||||||
|
* a1.getAbsoluteCapacity(), a1.getMaxApplications(), 1);
|
||||||
|
|
||||||
|
LeafQueue b = (LeafQueue)cs.
|
||||||
|
getQueue("root.b");
|
||||||
|
Assert.assertNotNull(b);
|
||||||
|
Assert.assertEquals(csConf.getMaximumSystemApplications()
|
||||||
|
* b.getAbsoluteCapacity(), b.getMaxApplications(), 1);
|
||||||
|
|
||||||
|
createQueue("root.e");
|
||||||
|
|
||||||
|
// Make sure other children queues
|
||||||
|
// max app correct.
|
||||||
|
LeafQueue e = (LeafQueue)cs.
|
||||||
|
getQueue("root.e");
|
||||||
|
Assert.assertNotNull(e);
|
||||||
|
Assert.assertEquals(csConf.getMaximumSystemApplications()
|
||||||
|
* e.getAbsoluteCapacity(), e.getMaxApplications(), 1);
|
||||||
|
|
||||||
|
a1 = (LeafQueue)cs.
|
||||||
|
getQueue("root.a.a1");
|
||||||
|
Assert.assertNotNull(a1);
|
||||||
|
Assert.assertEquals(csConf.getMaximumSystemApplications()
|
||||||
|
* a1.getAbsoluteCapacity(), a1.getMaxApplications(), 1);
|
||||||
|
|
||||||
|
b = (LeafQueue)cs.
|
||||||
|
getQueue("root.b");
|
||||||
|
Assert.assertNotNull(b);
|
||||||
|
Assert.assertEquals(csConf.getMaximumSystemApplications()
|
||||||
|
* b.getAbsoluteCapacity(), b.getMaxApplications(), 1);
|
||||||
|
|
||||||
|
// When update global max app per queue
|
||||||
|
csConf.setGlobalMaximumApplicationsPerQueue(1000);
|
||||||
|
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||||
|
Assert.assertEquals(1000, b.getMaxApplications());
|
||||||
|
Assert.assertEquals(1000, a1.getMaxApplications());
|
||||||
|
Assert.assertEquals(1000, e.getMaxApplications());
|
||||||
|
|
||||||
|
// when set some queue for max apps
|
||||||
|
csConf.setMaximumApplicationsPerQueue("root.e1", 50);
|
||||||
|
createQueue("root.e1");
|
||||||
|
LeafQueue e1 = (LeafQueue)cs.
|
||||||
|
getQueue("root.e1");
|
||||||
|
Assert.assertNotNull(e1);
|
||||||
|
|
||||||
|
cs.reinitialize(csConf, mockRM.getRMContext());
|
||||||
|
Assert.assertEquals(50, e1.getMaxApplications());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoCreateQueueIfAmbiguousQueueNames() throws Exception {
|
public void testAutoCreateQueueIfAmbiguousQueueNames() throws Exception {
|
||||||
startScheduler();
|
startScheduler();
|
||||||
|
Loading…
x
Reference in New Issue
Block a user