YARN-10579. CS Flexible AQC: Modify RM /scheduler endpoint to include weight values for queues. Contributed by Szilard Nemeth

This commit is contained in:
Szilard Nemeth 2021-01-21 09:23:11 +01:00
parent 8bc2dfbf36
commit 06fef5ee43
7 changed files with 113 additions and 13 deletions

View File

@ -46,6 +46,8 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
protected float capacity; protected float capacity;
protected float usedCapacity; protected float usedCapacity;
protected float maxCapacity; protected float maxCapacity;
protected float weight;
protected float normalizedWeight;
protected String queueName; protected String queueName;
protected CapacitySchedulerQueueInfoList queues; protected CapacitySchedulerQueueInfoList queues;
protected QueueCapacitiesInfo capacities; protected QueueCapacitiesInfo capacities;
@ -70,6 +72,8 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
if (max < EPSILON || max > 1f) if (max < EPSILON || max > 1f)
max = 1f; max = 1f;
this.maxCapacity = max * 100; this.maxCapacity = max * 100;
this.weight = parent.getQueueCapacities().getWeight();
this.normalizedWeight = parent.getQueueCapacities().getNormalizedWeight();
capacities = new QueueCapacitiesInfo(parent.getQueueCapacities(), capacities = new QueueCapacitiesInfo(parent.getQueueCapacities(),
parent.getQueueResourceQuotas(), false); parent.getQueueResourceQuotas(), false);
@ -147,7 +151,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
CapacityScheduler cs, CSQueue parent) { CapacityScheduler cs, CSQueue parent) {
CapacitySchedulerQueueInfoList queuesInfo = CapacitySchedulerQueueInfoList queuesInfo =
new CapacitySchedulerQueueInfoList(); new CapacitySchedulerQueueInfoList();
// JAXB marashalling leads to situation where the "type" field injected // JAXB marshalling leads to situation where the "type" field injected
// for JSON changes from string to array depending on order of printing // for JSON changes from string to array depending on order of printing
// Issue gets fixed if all the leaf queues are marshalled before the // Issue gets fixed if all the leaf queues are marshalled before the
// non-leaf queues. See YARN-4785 for more details. // non-leaf queues. See YARN-4785 for more details.

View File

@ -66,6 +66,8 @@ public class CapacitySchedulerQueueInfo {
protected float absoluteCapacity; protected float absoluteCapacity;
protected float absoluteMaxCapacity; protected float absoluteMaxCapacity;
protected float absoluteUsedCapacity; protected float absoluteUsedCapacity;
protected float weight;
protected float normalizedWeight;
protected int numApplications; protected int numApplications;
protected String queueName; protected String queueName;
protected boolean isAbsoluteResource; protected boolean isAbsoluteResource;
@ -109,6 +111,8 @@ public class CapacitySchedulerQueueInfo {
cap(q.getAbsoluteMaximumCapacity(), 0f, 1f) * 100; cap(q.getAbsoluteMaximumCapacity(), 0f, 1f) * 100;
absoluteUsedCapacity = absoluteUsedCapacity =
cap(q.getAbsoluteUsedCapacity(), 0f, 1f) * 100; cap(q.getAbsoluteUsedCapacity(), 0f, 1f) * 100;
weight = q.getQueueCapacities().getWeight();
normalizedWeight = q.getQueueCapacities().getNormalizedWeight();
numApplications = q.getNumApplications(); numApplications = q.getNumApplications();
allocatedContainers = q.getMetrics().getAllocatedContainers(); allocatedContainers = q.getMetrics().getAllocatedContainers();
pendingContainers = q.getMetrics().getPendingContainers(); pendingContainers = q.getMetrics().getPendingContainers();
@ -314,4 +318,12 @@ public class CapacitySchedulerQueueInfo {
public String getMode() { public String getMode() {
return mode; return mode;
} }
public float getWeight() {
return weight;
}
public float getNormalizedWeight() {
return normalizedWeight;
}
} }

View File

@ -39,6 +39,8 @@ public class PartitionQueueCapacitiesInfo {
private float absoluteUsedCapacity; private float absoluteUsedCapacity;
private float absoluteMaxCapacity = 100; private float absoluteMaxCapacity = 100;
private float maxAMLimitPercentage; private float maxAMLimitPercentage;
private float weight;
private float normalizedWeight;
private ResourceInfo configuredMinResource; private ResourceInfo configuredMinResource;
private ResourceInfo configuredMaxResource; private ResourceInfo configuredMaxResource;
private ResourceInfo effectiveMinResource; private ResourceInfo effectiveMinResource;
@ -50,6 +52,7 @@ public class PartitionQueueCapacitiesInfo {
public PartitionQueueCapacitiesInfo(String partitionName, float capacity, public PartitionQueueCapacitiesInfo(String partitionName, float capacity,
float usedCapacity, float maxCapacity, float absCapacity, float usedCapacity, float maxCapacity, float absCapacity,
float absUsedCapacity, float absMaxCapacity, float maxAMLimitPercentage, float absUsedCapacity, float absMaxCapacity, float maxAMLimitPercentage,
float weight, float normalizedWeight,
Resource confMinRes, Resource confMaxRes, Resource effMinRes, Resource confMinRes, Resource confMaxRes, Resource effMinRes,
Resource effMaxRes) { Resource effMaxRes) {
super(); super();
@ -61,6 +64,8 @@ public class PartitionQueueCapacitiesInfo {
this.absoluteUsedCapacity = absUsedCapacity; this.absoluteUsedCapacity = absUsedCapacity;
this.absoluteMaxCapacity = absMaxCapacity; this.absoluteMaxCapacity = absMaxCapacity;
this.maxAMLimitPercentage = maxAMLimitPercentage; this.maxAMLimitPercentage = maxAMLimitPercentage;
this.weight = weight;
this.normalizedWeight = normalizedWeight;
this.configuredMinResource = new ResourceInfo(confMinRes); this.configuredMinResource = new ResourceInfo(confMinRes);
this.configuredMaxResource = new ResourceInfo(confMaxRes); this.configuredMaxResource = new ResourceInfo(confMaxRes);
this.effectiveMinResource = new ResourceInfo(effMinRes); this.effectiveMinResource = new ResourceInfo(effMinRes);
@ -127,6 +132,22 @@ public class PartitionQueueCapacitiesInfo {
return maxAMLimitPercentage; return maxAMLimitPercentage;
} }
public float getWeight() {
return weight;
}
public void setWeight(float weight) {
this.weight = weight;
}
public float getNormalizedWeight() {
return normalizedWeight;
}
public void setNormalizedWeight(float normalizedWeight) {
this.normalizedWeight = normalizedWeight;
}
public void setMaxAMLimitPercentage(float maxAMLimitPercentage) { public void setMaxAMLimitPercentage(float maxAMLimitPercentage) {
this.maxAMLimitPercentage = maxAMLimitPercentage; this.maxAMLimitPercentage = maxAMLimitPercentage;
} }

View File

@ -52,6 +52,8 @@ public class QueueCapacitiesInfo {
float absUsedCapacity; float absUsedCapacity;
float absMaxCapacity; float absMaxCapacity;
float maxAMLimitPercentage; float maxAMLimitPercentage;
float weight;
float normalizedWeight;
for (String partitionName : capacities.getExistingNodeLabels()) { for (String partitionName : capacities.getExistingNodeLabels()) {
usedCapacity = capacities.getUsedCapacity(partitionName) * 100; usedCapacity = capacities.getUsedCapacity(partitionName) * 100;
capacity = capacities.getCapacity(partitionName) * 100; capacity = capacities.getCapacity(partitionName) * 100;
@ -67,10 +69,13 @@ public class QueueCapacitiesInfo {
if (maxCapacity < CapacitySchedulerQueueInfo.EPSILON || maxCapacity > 1f) if (maxCapacity < CapacitySchedulerQueueInfo.EPSILON || maxCapacity > 1f)
maxCapacity = 1f; maxCapacity = 1f;
maxCapacity = maxCapacity * 100; maxCapacity = maxCapacity * 100;
weight = capacities.getWeight(partitionName);
normalizedWeight = capacities.getNormalizedWeight(partitionName);
queueCapacitiesByPartition.add(new PartitionQueueCapacitiesInfo( queueCapacitiesByPartition.add(new PartitionQueueCapacitiesInfo(
partitionName, capacity, usedCapacity, maxCapacity, absCapacity, partitionName, capacity, usedCapacity, maxCapacity, absCapacity,
absUsedCapacity, absMaxCapacity, absUsedCapacity, absMaxCapacity,
considerAMUsage ? maxAMLimitPercentage : 0f, considerAMUsage ? maxAMLimitPercentage : 0f,
weight, normalizedWeight,
resourceQuotas.getConfiguredMinResource(partitionName), resourceQuotas.getConfiguredMinResource(partitionName),
resourceQuotas.getConfiguredMaxResource(partitionName), resourceQuotas.getConfiguredMaxResource(partitionName),
resourceQuotas.getEffectiveMinResource(partitionName), resourceQuotas.getEffectiveMinResource(partitionName),

View File

@ -362,7 +362,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
JSONObject info = json.getJSONObject("scheduler"); JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements in: " + info, 1, info.length()); assertEquals("incorrect number of elements in: " + info, 1, info.length());
info = info.getJSONObject("schedulerInfo"); info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements in: " + info, 13, info.length()); assertEquals("incorrect number of elements in: " + info, 15, info.length());
verifyClusterSchedulerGeneric(info.getString("type"), verifyClusterSchedulerGeneric(info.getString("type"),
(float) info.getDouble("usedCapacity"), (float) info.getDouble("usedCapacity"),
(float) info.getDouble("capacity"), (float) info.getDouble("capacity"),
@ -413,10 +413,10 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
private void verifySubQueue(JSONObject info, String q, private void verifySubQueue(JSONObject info, String q,
float parentAbsCapacity, float parentAbsMaxCapacity) float parentAbsCapacity, float parentAbsMaxCapacity)
throws JSONException, Exception { throws JSONException, Exception {
int numExpectedElements = 28; int numExpectedElements = 30;
boolean isParentQueue = true; boolean isParentQueue = true;
if (!info.has("queues")) { if (!info.has("queues")) {
numExpectedElements = 46; numExpectedElements = 48;
isParentQueue = false; isParentQueue = false;
} }
assertEquals("incorrect number of elements", numExpectedElements, info.length()); assertEquals("incorrect number of elements", numExpectedElements, info.length());

View File

@ -63,9 +63,26 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
JerseyTestBase { JerseyTestBase {
private static final Logger LOG = private static final Logger LOG =
LoggerFactory.getLogger(TestRMWebServicesCapacitySchedDynamicConfig.class); LoggerFactory.getLogger(TestRMWebServicesCapacitySchedDynamicConfig.class);
private static final float EXP_WEIGHT_NON_WEIGHT_MODE = -1.0F;
private static final float EXP_NORM_WEIGHT_NON_WEIGHT_MODE = 0.0F;
private static final float EXP_ROOT_WEIGHT_IN_WEIGHT_MODE = 1.0F;
private static final double DELTA = 0.00001;
protected static MockRM rm; protected static MockRM rm;
private static class ExpectedQueueWithProperties {
private String path;
public final float weight;
public final float normalizedWeight;
public ExpectedQueueWithProperties(String path, float weight,
float normalizedWeight) {
this.path = path;
this.weight = weight;
this.normalizedWeight = normalizedWeight;
}
}
private static class WebServletModule extends ServletModule { private static class WebServletModule extends ServletModule {
private final Configuration conf; private final Configuration conf;
@ -124,8 +141,15 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
initResourceManager(config); initResourceManager(config);
JSONObject json = sendRequestToSchedulerEndpoint(); JSONObject json = sendRequestToSchedulerEndpoint();
validateSchedulerInfo(json, "percentage", "root.default", "root.test1", validateSchedulerInfo(json, "percentage",
"root.test2"); new ExpectedQueueWithProperties("root",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE),
new ExpectedQueueWithProperties("root.default",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE),
new ExpectedQueueWithProperties("root.test1",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE),
new ExpectedQueueWithProperties("root.test2",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE));
} }
@Test @Test
@ -138,8 +162,15 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
initResourceManager(config); initResourceManager(config);
JSONObject json = sendRequestToSchedulerEndpoint(); JSONObject json = sendRequestToSchedulerEndpoint();
validateSchedulerInfo(json, "absolute", "root.default", "root.test1", validateSchedulerInfo(json, "absolute",
"root.test2"); new ExpectedQueueWithProperties("root",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE),
new ExpectedQueueWithProperties("root.default",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE),
new ExpectedQueueWithProperties("root.test1",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE),
new ExpectedQueueWithProperties("root.test2",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE));
} }
@Test @Test
@ -152,8 +183,12 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
initResourceManager(config); initResourceManager(config);
JSONObject json = sendRequestToSchedulerEndpoint(); JSONObject json = sendRequestToSchedulerEndpoint();
validateSchedulerInfo(json, "weight", "root.default", "root.test1", validateSchedulerInfo(json, "weight",
"root.test2"); new ExpectedQueueWithProperties("root",
EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE),
new ExpectedQueueWithProperties("root.default", 10.0f, 0.5f),
new ExpectedQueueWithProperties("root.test1", 4.0f, 0.2f),
new ExpectedQueueWithProperties("root.test2", 6.0f, 0.3f));
} }
private JSONObject sendRequestToSchedulerEndpoint() throws Exception { private JSONObject sendRequestToSchedulerEndpoint() throws Exception {
@ -169,7 +204,14 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
} }
private void validateSchedulerInfo(JSONObject json, String expectedMode, private void validateSchedulerInfo(JSONObject json, String expectedMode,
String... expectedQueues) throws JSONException { ExpectedQueueWithProperties rootQueue,
ExpectedQueueWithProperties... expectedQueues) throws JSONException {
Map<String, ExpectedQueueWithProperties> queuesMap = new HashMap<>();
for (ExpectedQueueWithProperties expectedQueue : expectedQueues) {
queuesMap.put(expectedQueue.path, expectedQueue);
}
int expectedQSize = expectedQueues.length; int expectedQSize = expectedQueues.length;
Assert.assertNotNull("SchedulerTypeInfo should not be null", json); Assert.assertNotNull("SchedulerTypeInfo should not be null", json);
assertEquals("incorrect number of elements in: " + json, 1, json.length()); assertEquals("incorrect number of elements in: " + json, 1, json.length());
@ -178,11 +220,15 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
Assert.assertNotNull("Scheduler object should not be null", json); Assert.assertNotNull("Scheduler object should not be null", json);
assertEquals("incorrect number of elements in: " + info, 1, info.length()); assertEquals("incorrect number of elements in: " + info, 1, info.length());
//Validate if root queue has the expected mode //Validate if root queue has the expected mode and weight values
info = info.getJSONObject("schedulerInfo"); info = info.getJSONObject("schedulerInfo");
Assert.assertNotNull("SchedulerInfo should not be null", info); Assert.assertNotNull("SchedulerInfo should not be null", info);
Assert.assertEquals("Expected Queue mode " +expectedMode, expectedMode, Assert.assertEquals("Expected Queue mode " +expectedMode, expectedMode,
info.getString("mode")); info.getString("mode"));
Assert.assertEquals(rootQueue.weight,
Float.parseFloat(info.getString("weight")), DELTA);
Assert.assertEquals(rootQueue.normalizedWeight,
Float.parseFloat(info.getString("normalizedWeight")), DELTA);
JSONObject queuesObj = info.getJSONObject("queues"); JSONObject queuesObj = info.getJSONObject("queues");
Assert.assertNotNull("QueueInfoList should not be null", queuesObj); Assert.assertNotNull("QueueInfoList should not be null", queuesObj);
@ -200,10 +246,22 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
obj.getString("queueName"); obj.getString("queueName");
String mode = obj.getString("mode"); String mode = obj.getString("mode");
modesMap.put(queuePath, mode); modesMap.put(queuePath, mode);
//validate weights of all other queues
ExpectedQueueWithProperties expectedQueue = queuesMap.get(queuePath);
Assert.assertNotNull("Queue not found in expectedQueueMap with path: " +
queuePath, expectedQueue);
Assert.assertEquals("Weight value does not match",
expectedQueue.weight, Float.parseFloat(obj.getString("weight")),
DELTA);
Assert.assertEquals("Normalized weight value does not match",
expectedQueue.normalizedWeight,
Float.parseFloat(obj.getString("normalizedWeight")), DELTA);
} }
//Validate queue paths and modes //Validate queue paths and modes
List<String> sortedExpectedPaths = Arrays.stream(expectedQueues) List<String> sortedExpectedPaths = Arrays.stream(expectedQueues)
.map(eq -> eq.path)
.sorted(Comparator.comparing(String::toLowerCase)) .sorted(Comparator.comparing(String::toLowerCase))
.collect(Collectors.toList()); .collect(Collectors.toList());

View File

@ -574,7 +574,7 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
JSONObject info = json.getJSONObject("scheduler"); JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements", 1, info.length()); assertEquals("incorrect number of elements", 1, info.length());
info = info.getJSONObject("schedulerInfo"); info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements", 13, info.length()); assertEquals("incorrect number of elements", 15, info.length());
JSONObject capacitiesJsonObject = info.getJSONObject(CAPACITIES); JSONObject capacitiesJsonObject = info.getJSONObject(CAPACITIES);
JSONArray partitionsCapsArray = JSONArray partitionsCapsArray =
capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION); capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION);