YARN-10364. Fix logic of queue capacity is absolute resource or percentage.
Contributed by Bilwa ST. Reviewed by Sunil G.
This commit is contained in:
parent
c2a17659d1
commit
5e0f879779
|
@ -84,6 +84,14 @@ public class AbstractAutoCreatedLeafQueue extends LeafQueue {
|
||||||
label);
|
label);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
|
||||||
|
String label) {
|
||||||
|
return super.checkConfigTypeIsAbsoluteResource(csContext.getConfiguration()
|
||||||
|
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
|
||||||
|
label);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This methods to change capacity for a queue and adjusts its
|
* This methods to change capacity for a queue and adjusts its
|
||||||
* absoluteCapacity
|
* absoluteCapacity
|
||||||
|
|
|
@ -542,6 +542,12 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
return maxResource;
|
return maxResource;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected boolean checkConfigTypeIsAbsoluteResource(String queuePath,
|
||||||
|
String label) {
|
||||||
|
return csContext.getConfiguration().checkConfigTypeIsAbsoluteResource(label,
|
||||||
|
queuePath, resourceTypes);
|
||||||
|
}
|
||||||
|
|
||||||
protected void updateConfigurableResourceRequirement(String queuePath,
|
protected void updateConfigurableResourceRequirement(String queuePath,
|
||||||
Resource clusterResource) {
|
Resource clusterResource) {
|
||||||
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
||||||
|
@ -554,17 +560,18 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
LOG.debug("capacityConfigType is '{}' for queue {}",
|
LOG.debug("capacityConfigType is '{}' for queue {}",
|
||||||
capacityConfigType, getQueuePath());
|
capacityConfigType, getQueuePath());
|
||||||
|
|
||||||
|
CapacityConfigType localType = checkConfigTypeIsAbsoluteResource(
|
||||||
|
queuePath, label) ? CapacityConfigType.ABSOLUTE_RESOURCE
|
||||||
|
: CapacityConfigType.PERCENTAGE;
|
||||||
|
|
||||||
if (this.capacityConfigType.equals(CapacityConfigType.NONE)) {
|
if (this.capacityConfigType.equals(CapacityConfigType.NONE)) {
|
||||||
this.capacityConfigType = (!minResource.equals(Resources.none())
|
this.capacityConfigType = localType;
|
||||||
&& queueCapacities.getAbsoluteCapacity(label) == 0f)
|
|
||||||
? CapacityConfigType.ABSOLUTE_RESOURCE
|
|
||||||
: CapacityConfigType.PERCENTAGE;
|
|
||||||
LOG.debug("capacityConfigType is updated as '{}' for queue {}",
|
LOG.debug("capacityConfigType is updated as '{}' for queue {}",
|
||||||
capacityConfigType, getQueuePath());
|
capacityConfigType, getQueuePath());
|
||||||
|
} else {
|
||||||
|
validateAbsoluteVsPercentageCapacityConfig(localType);
|
||||||
}
|
}
|
||||||
|
|
||||||
validateAbsoluteVsPercentageCapacityConfig(minResource);
|
|
||||||
|
|
||||||
// If min resource for a resource type is greater than its max resource,
|
// If min resource for a resource type is greater than its max resource,
|
||||||
// throw exception to handle such error configs.
|
// throw exception to handle such error configs.
|
||||||
if (!maxResource.equals(Resources.none()) && Resources.greaterThan(
|
if (!maxResource.equals(Resources.none()) && Resources.greaterThan(
|
||||||
|
@ -607,12 +614,7 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void validateAbsoluteVsPercentageCapacityConfig(
|
private void validateAbsoluteVsPercentageCapacityConfig(
|
||||||
Resource minResource) {
|
CapacityConfigType localType) {
|
||||||
CapacityConfigType localType = CapacityConfigType.PERCENTAGE;
|
|
||||||
if (!minResource.equals(Resources.none())) {
|
|
||||||
localType = CapacityConfigType.ABSOLUTE_RESOURCE;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!queuePath.equals("root")
|
if (!queuePath.equals("root")
|
||||||
&& !this.capacityConfigType.equals(localType)) {
|
&& !this.capacityConfigType.equals(localType)) {
|
||||||
throw new IllegalArgumentException("Queue '" + getQueuePath()
|
throw new IllegalArgumentException("Queue '" + getQueuePath()
|
||||||
|
|
|
@ -2166,6 +2166,21 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||||
set(prefix, resourceString.toString());
|
set(prefix, resourceString.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean checkConfigTypeIsAbsoluteResource(String label, String queue,
|
||||||
|
Set<String> resourceTypes) {
|
||||||
|
String propertyName = getNodeLabelPrefix(queue, label) + CAPACITY;
|
||||||
|
String resourceString = get(propertyName);
|
||||||
|
if (resourceString == null || resourceString.isEmpty()) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Matcher matcher = RESOURCE_PATTERN.matcher(resourceString);
|
||||||
|
if (matcher.find()) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
private Resource internalGetLabeledResourceRequirementForQueue(String queue,
|
private Resource internalGetLabeledResourceRequirementForQueue(String queue,
|
||||||
String label, Set<String> resourceTypes, String suffix) {
|
String label, Set<String> resourceTypes, String suffix) {
|
||||||
String propertyName = getNodeLabelPrefix(queue, label) + suffix;
|
String propertyName = getNodeLabelPrefix(queue, label) + suffix;
|
||||||
|
|
|
@ -524,6 +524,57 @@ public class TestAbsoluteResourceConfiguration {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testValidateAbsoluteResourceConfig() throws Exception {
|
||||||
|
/**
|
||||||
|
* Queue structure is as follows. root / a / \ a1 a2
|
||||||
|
*
|
||||||
|
* Test below cases: 1) Test ConfigType when resource is [memory=0]
|
||||||
|
*/
|
||||||
|
|
||||||
|
// create conf with basic queue configuration.
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
new CapacitySchedulerConfiguration();
|
||||||
|
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] {QUEUEA, QUEUEB});
|
||||||
|
csConf.setQueues(QUEUEA_FULL, new String[] {QUEUEA1, QUEUEA2});
|
||||||
|
|
||||||
|
// Set default capacities like normal configuration.
|
||||||
|
csConf.setCapacity(QUEUEA_FULL, "[memory=125]");
|
||||||
|
csConf.setCapacity(QUEUEB_FULL, "[memory=0]");
|
||||||
|
csConf.setCapacity(QUEUEA1_FULL, "[memory=100]");
|
||||||
|
csConf.setCapacity(QUEUEA2_FULL, "[memory=25]");
|
||||||
|
|
||||||
|
// Update min/max resource to queueA
|
||||||
|
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MINRES);
|
||||||
|
csConf.setMaximumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MAXRES);
|
||||||
|
|
||||||
|
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||||
|
ResourceScheduler.class);
|
||||||
|
|
||||||
|
@SuppressWarnings("resource")
|
||||||
|
MockRM rm = new MockRM(csConf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
// Add few nodes
|
||||||
|
rm.registerNode("127.0.0.1:1234", 125 * GB, 20);
|
||||||
|
|
||||||
|
// Set [memory=0] to one of the queue and see if reinitialization
|
||||||
|
// doesnt throw exception saying "Parent queue 'root.A' and
|
||||||
|
// child queue 'root.A.A2' should use either percentage
|
||||||
|
// based capacityconfiguration or absolute resource together for label"
|
||||||
|
csConf.setCapacity(QUEUEA1_FULL, "[memory=125]");
|
||||||
|
csConf.setCapacity(QUEUEA2_FULL, "[memory=0]");
|
||||||
|
|
||||||
|
// Get queue object to verify min/max resource configuration.
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
try {
|
||||||
|
cs.reinitialize(csConf, rm.getRMContext());
|
||||||
|
} catch (IOException e) {
|
||||||
|
Assert.fail(e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testEffectiveResourceAfterReducingClusterResource()
|
public void testEffectiveResourceAfterReducingClusterResource()
|
||||||
throws Exception {
|
throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue