YARN-11067. Resource overcommitment due to incorrect resource normalisation logical order. Contributed by Andras Gyori
This commit is contained in:
parent
481da19494
commit
ed65aa2324
|
@ -1294,17 +1294,24 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
|
||||
private void calculateEffectiveResourcesAndCapacity(String label,
|
||||
Resource clusterResource) {
|
||||
// Update effective resources for my self;
|
||||
if (rootQueue) {
|
||||
Resource resourceByLabel = labelManager.getResourceByLabel(label, clusterResource);
|
||||
usageTracker.getQueueResourceQuotas().setEffectiveMinResource(label, resourceByLabel);
|
||||
usageTracker.getQueueResourceQuotas().setEffectiveMaxResource(label, resourceByLabel);
|
||||
} else {
|
||||
super.updateEffectiveResources(clusterResource);
|
||||
}
|
||||
|
||||
recalculateEffectiveMinRatio(label, clusterResource);
|
||||
}
|
||||
|
||||
private void recalculateEffectiveMinRatio(String label, Resource clusterResource) {
|
||||
// For root queue, ensure that max/min resource is updated to latest
|
||||
// cluster resource.
|
||||
Resource resourceByLabel = labelManager.getResourceByLabel(label,
|
||||
clusterResource);
|
||||
Resource resourceByLabel = labelManager.getResourceByLabel(label, clusterResource);
|
||||
|
||||
/*
|
||||
* == Below logic are added to calculate effectiveMinRatioPerResource ==
|
||||
*/
|
||||
|
||||
// Total configured min resources of direct children of this given parent
|
||||
// queue
|
||||
// Total configured min resources of direct children of this given parent queue
|
||||
Resource configuredMinResources = Resource.newInstance(0L, 0);
|
||||
for (CSQueue childQueue : getChildQueues()) {
|
||||
Resources.addTo(configuredMinResources,
|
||||
|
@ -1312,8 +1319,7 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
}
|
||||
|
||||
// Factor to scale down effective resource: When cluster has sufficient
|
||||
// resources, effective_min_resources will be same as configured
|
||||
// min_resources.
|
||||
// resources, effective_min_resources will be same as configured min_resources.
|
||||
Resource numeratorForMinRatio = null;
|
||||
if (getQueuePath().equals("root")) {
|
||||
if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(resourceCalculator,
|
||||
|
@ -1324,21 +1330,12 @@ public class ParentQueue extends AbstractCSQueue {
|
|||
if (Resources.lessThan(resourceCalculator, clusterResource,
|
||||
usageTracker.getQueueResourceQuotas().getEffectiveMinResource(label),
|
||||
configuredMinResources)) {
|
||||
numeratorForMinRatio = usageTracker.getQueueResourceQuotas()
|
||||
.getEffectiveMinResource(label);
|
||||
numeratorForMinRatio = usageTracker.getQueueResourceQuotas().getEffectiveMinResource(label);
|
||||
}
|
||||
}
|
||||
|
||||
effectiveMinResourceRatio.put(label, getEffectiveMinRatio(
|
||||
configuredMinResources, numeratorForMinRatio));
|
||||
|
||||
// Update effective resources for my self;
|
||||
if (rootQueue) {
|
||||
usageTracker.getQueueResourceQuotas().setEffectiveMinResource(label, resourceByLabel);
|
||||
usageTracker.getQueueResourceQuotas().setEffectiveMaxResource(label, resourceByLabel);
|
||||
} else{
|
||||
super.updateEffectiveResources(clusterResource);
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Float> getEffectiveMinRatio(
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -100,6 +101,21 @@ public class TestAbsoluteResourceConfiguration {
|
|||
private static Set<String> resourceTypes = new HashSet<>(
|
||||
Arrays.asList("memory", "vcores"));
|
||||
|
||||
private CapacitySchedulerConfiguration setupNormalizationConfiguration() {
|
||||
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[]{QUEUEA, QUEUEB});
|
||||
csConf.setQueues(QUEUEA_FULL.getFullPath(), new String[]{QUEUEA1, QUEUEA2});
|
||||
|
||||
// 60, 28
|
||||
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, Resource.newInstance(50 * GB, 20));
|
||||
csConf.setMinimumResourceRequirement("", QUEUEA1_FULL, Resource.newInstance(30 * GB, 15));
|
||||
csConf.setMinimumResourceRequirement("", QUEUEA2_FULL, Resource.newInstance(20 * GB, 5));
|
||||
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, Resource.newInstance(10 * GB, 8));
|
||||
|
||||
return csConf;
|
||||
}
|
||||
|
||||
private CapacitySchedulerConfiguration setupSimpleQueueConfiguration(
|
||||
boolean isCapacityNeeded) {
|
||||
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
|
||||
|
@ -292,6 +308,37 @@ public class TestAbsoluteResourceConfiguration {
|
|||
rm.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNormalizationAfterNodeRemoval() throws Exception {
|
||||
CapacitySchedulerConfiguration csConf = setupNormalizationConfiguration();
|
||||
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
|
||||
MockRM rm = new MockRM(csConf);
|
||||
|
||||
rm.start();
|
||||
rm.registerNode("h1:1234", 8 * GB, 4);
|
||||
rm.registerNode("h2:1234", 8 * GB, 4);
|
||||
rm.registerNode("h3:1234", 8 * GB, 4);
|
||||
MockNM nm = rm.registerNode("h4:1234", 8 * GB, 4);
|
||||
rm.registerNode("h5:1234", 28 * GB, 12);
|
||||
|
||||
// Send a removal event to CS. MockRM#unregisterNode does not reflect the real world scenario,
|
||||
// therefore we manually need to invoke this removal event.
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
cs.handle(new NodeRemovedSchedulerEvent(rm.getRMContext().getRMNodes().get(nm.getNodeId())));
|
||||
|
||||
Resource res = Resources.add(
|
||||
cs.getQueue(QUEUEA1_FULL.getFullPath()).getEffectiveCapacity(""),
|
||||
cs.getQueue(QUEUEA2_FULL.getFullPath()).getEffectiveCapacity(""));
|
||||
Resource resParent = cs.getQueue(QUEUEA_FULL.getFullPath()).getEffectiveCapacity("");
|
||||
|
||||
// Check if there is no overcommitment on behalf of the child queues
|
||||
Assert.assertTrue(String.format("Summarized resource %s of all children is greater than " +
|
||||
"their parent's %s", res, resParent),
|
||||
Resources.lessThan(cs.getResourceCalculator(), cs.getClusterResource(), res, resParent));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEffectiveMinMaxResourceConfigurartionPerQueue()
|
||||
throws Exception {
|
||||
|
|
Loading…
Reference in New Issue