YARN-10581. CS Flexible Auto Queue Creation: Modify RM /scheduler endpoint to include queue creation type for queues. Contributed by Szilard Nemeth.

This commit is contained in:
Peter Bacsko 2021-01-21 18:06:53 +01:00
parent 06fef5ee43
commit 17fdac8de5
6 changed files with 267 additions and 36 deletions

View File

@ -57,6 +57,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
protected int queuePriority; protected int queuePriority;
protected String orderingPolicyInfo; protected String orderingPolicyInfo;
protected String mode; protected String mode;
protected String queueType;
@XmlTransient @XmlTransient
static final float EPSILON = 1e-8f; static final float EPSILON = 1e-8f;
@ -105,6 +106,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
.getConfigName(); .getConfigName();
} }
mode = CapacitySchedulerInfoHelper.getMode(parent); mode = CapacitySchedulerInfoHelper.getMode(parent);
queueType = CapacitySchedulerInfoHelper.getQueueType(parent);
} }
public float getCapacity() { public float getCapacity() {
@ -185,4 +187,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
return mode; return mode;
} }
public String getQueueType() {
return queueType;
}
} }

View File

@ -90,6 +90,7 @@ public class CapacitySchedulerQueueInfo {
protected boolean autoCreateChildQueueEnabled; protected boolean autoCreateChildQueueEnabled;
protected LeafQueueTemplateInfo leafQueueTemplate; protected LeafQueueTemplateInfo leafQueueTemplate;
protected String mode; protected String mode;
protected String queueType;
CapacitySchedulerQueueInfo() { CapacitySchedulerQueueInfo() {
}; };
@ -135,6 +136,7 @@ public class CapacitySchedulerQueueInfo {
populateQueueCapacities(qCapacities, qResQuotas); populateQueueCapacities(qCapacities, qResQuotas);
mode = CapacitySchedulerInfoHelper.getMode(q); mode = CapacitySchedulerInfoHelper.getMode(q);
queueType = CapacitySchedulerInfoHelper.getQueueType(q);
ResourceUsage queueResourceUsage = q.getQueueResourceUsage(); ResourceUsage queueResourceUsage = q.getQueueResourceUsage();
populateQueueResourceUsage(queueResourceUsage); populateQueueResourceUsage(queueResourceUsage);
@ -319,6 +321,10 @@ public class CapacitySchedulerQueueInfo {
return mode; return mode;
} }
public String getQueueType() {
return queueType;
}
public float getWeight() { public float getWeight() {
return weight; return weight;
} }

View File

@ -18,9 +18,17 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractManagedParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
public class CapacitySchedulerInfoHelper { public class CapacitySchedulerInfoHelper {
private static final String AUTO_CREATED_LEAF = "autoCreatedLeaf";
private static final String STATIC_LEAF = "staticLeaf";
private static final String AUTO_CREATED_PARENT = "autoCreatedParent";
private static final String STATIC_PARENT = "staticParent";
private static final String UNKNOWN_QUEUE = "unknown";
private CapacitySchedulerInfoHelper() {} private CapacitySchedulerInfoHelper() {}
@ -41,4 +49,22 @@ public class CapacitySchedulerInfoHelper {
throw new YarnRuntimeException("Unknown mode for queue: " + throw new YarnRuntimeException("Unknown mode for queue: " +
queue.getQueuePath() + ". Queue details: " + queue); queue.getQueuePath() + ". Queue details: " + queue);
} }
public static String getQueueType(CSQueue queue) {
if (queue instanceof LeafQueue) {
if (((AbstractCSQueue)queue).isDynamicQueue()) {
return AUTO_CREATED_LEAF;
} else {
return STATIC_LEAF;
}
} else if (queue instanceof ParentQueue) {
if (((AbstractCSQueue)queue).isDynamicQueue()) {
return AUTO_CREATED_PARENT;
} else {
//A ParentQueue with isDynamic=false or an AbstractManagedParentQueue
return STATIC_PARENT;
}
}
return UNKNOWN_QUEUE;
}
} }

View File

@ -362,7 +362,7 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
JSONObject info = json.getJSONObject("scheduler"); JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements in: " + info, 1, info.length()); assertEquals("incorrect number of elements in: " + info, 1, info.length());
info = info.getJSONObject("schedulerInfo"); info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements in: " + info, 15, info.length()); assertEquals("incorrect number of elements in: " + info, 16, info.length());
verifyClusterSchedulerGeneric(info.getString("type"), verifyClusterSchedulerGeneric(info.getString("type"),
(float) info.getDouble("usedCapacity"), (float) info.getDouble("usedCapacity"),
(float) info.getDouble("capacity"), (float) info.getDouble("capacity"),
@ -413,10 +413,10 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
private void verifySubQueue(JSONObject info, String q, private void verifySubQueue(JSONObject info, String q,
float parentAbsCapacity, float parentAbsMaxCapacity) float parentAbsCapacity, float parentAbsMaxCapacity)
throws JSONException, Exception { throws JSONException, Exception {
int numExpectedElements = 30; int numExpectedElements = 31;
boolean isParentQueue = true; boolean isParentQueue = true;
if (!info.has("queues")) { if (!info.has("queues")) {
numExpectedElements = 48; numExpectedElements = 49;
isParentQueue = false; isParentQueue = false;
} }
assertEquals("incorrect number of elements", numExpectedElements, info.length()); assertEquals("incorrect number of elements", numExpectedElements, info.length());

View File

@ -35,11 +35,16 @@ import javax.ws.rs.core.MediaType;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.JettyUtils; import org.apache.hadoop.http.JettyUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerAutoQueueHandler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.GuiceServletConfig; import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
import org.apache.hadoop.yarn.webapp.JerseyTestBase; import org.apache.hadoop.yarn.webapp.JerseyTestBase;
@ -66,20 +71,30 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
private static final float EXP_WEIGHT_NON_WEIGHT_MODE = -1.0F; private static final float EXP_WEIGHT_NON_WEIGHT_MODE = -1.0F;
private static final float EXP_NORM_WEIGHT_NON_WEIGHT_MODE = 0.0F; private static final float EXP_NORM_WEIGHT_NON_WEIGHT_MODE = 0.0F;
private static final float EXP_ROOT_WEIGHT_IN_WEIGHT_MODE = 1.0F; private static final float EXP_ROOT_WEIGHT_IN_WEIGHT_MODE = 1.0F;
private static final float EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE = 1.0F;
private static final double DELTA = 0.00001; private static final double DELTA = 0.00001;
private static final String STATIC_PARENT = "staticParent";
private static final String STATIC_LEAF = "staticLeaf";
private static final int GB = 1024;
private static final String AUTO_CREATED_LEAF = "autoCreatedLeaf";
private static final String AUTO_CREATED_PARENT = "autoCreatedParent";
protected static MockRM RM;
protected static MockRM rm; private CapacitySchedulerAutoQueueHandler autoQueueHandler;
private CapacitySchedulerConfiguration csConf;
private static class ExpectedQueueWithProperties { private static class ExpectedQueueWithProperties {
private String path; private String path;
public final float weight; public final float weight;
public final float normalizedWeight; public final float normalizedWeight;
private String queueType;
public ExpectedQueueWithProperties(String path, float weight, public ExpectedQueueWithProperties(String path, float weight,
float normalizedWeight) { float normalizedWeight, String queueType) {
this.path = path; this.path = path;
this.weight = weight; this.weight = weight;
this.normalizedWeight = normalizedWeight; this.normalizedWeight = normalizedWeight;
this.queueType = queueType;
} }
} }
@ -99,8 +114,8 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
ResourceScheduler.class); ResourceScheduler.class);
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
rm = new MockRM(conf); RM = new MockRM(conf);
bind(ResourceManager.class).toInstance(rm); bind(ResourceManager.class).toInstance(RM);
serve("/*").with(GuiceContainer.class); serve("/*").with(GuiceContainer.class);
} }
} }
@ -114,13 +129,15 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
private void initResourceManager(Configuration conf) throws IOException { private void initResourceManager(Configuration conf) throws IOException {
GuiceServletConfig.setInjector( GuiceServletConfig.setInjector(
Guice.createInjector(new WebServletModule(conf))); Guice.createInjector(new WebServletModule(conf)));
rm.start(); RM.start();
//Need to call reinitialize as //Need to call reinitialize as
//MutableCSConfigurationProvider with InMemoryConfigurationStore //MutableCSConfigurationProvider with InMemoryConfigurationStore
//somehow does not load the queues properly and falls back to default config. //somehow does not load the queues properly and falls back to default config.
//Therefore CS will think there's only the default queue there. //Therefore CS will think there's only the default queue there.
((CapacityScheduler)rm.getResourceScheduler()).reinitialize(conf, ((CapacityScheduler) RM.getResourceScheduler()).reinitialize(conf,
rm.getRMContext(), true); RM.getRMContext(), true);
CapacityScheduler cs = (CapacityScheduler) RM.getResourceScheduler();
csConf = cs.getConfiguration();
} }
public TestRMWebServicesCapacitySchedDynamicConfig() { public TestRMWebServicesCapacitySchedDynamicConfig() {
@ -143,13 +160,17 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
JSONObject json = sendRequestToSchedulerEndpoint(); JSONObject json = sendRequestToSchedulerEndpoint();
validateSchedulerInfo(json, "percentage", validateSchedulerInfo(json, "percentage",
new ExpectedQueueWithProperties("root", new ExpectedQueueWithProperties("root",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE), EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
STATIC_PARENT),
new ExpectedQueueWithProperties("root.default", new ExpectedQueueWithProperties("root.default",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE), EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
STATIC_LEAF),
new ExpectedQueueWithProperties("root.test1", new ExpectedQueueWithProperties("root.test1",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE), EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
STATIC_LEAF),
new ExpectedQueueWithProperties("root.test2", new ExpectedQueueWithProperties("root.test2",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE)); EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
STATIC_LEAF));
} }
@Test @Test
@ -164,13 +185,17 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
JSONObject json = sendRequestToSchedulerEndpoint(); JSONObject json = sendRequestToSchedulerEndpoint();
validateSchedulerInfo(json, "absolute", validateSchedulerInfo(json, "absolute",
new ExpectedQueueWithProperties("root", new ExpectedQueueWithProperties("root",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE), EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
STATIC_PARENT),
new ExpectedQueueWithProperties("root.default", new ExpectedQueueWithProperties("root.default",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE), EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
STATIC_LEAF),
new ExpectedQueueWithProperties("root.test1", new ExpectedQueueWithProperties("root.test1",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE), EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
STATIC_LEAF),
new ExpectedQueueWithProperties("root.test2", new ExpectedQueueWithProperties("root.test2",
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE)); EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
STATIC_LEAF));
} }
@Test @Test
@ -185,10 +210,99 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
JSONObject json = sendRequestToSchedulerEndpoint(); JSONObject json = sendRequestToSchedulerEndpoint();
validateSchedulerInfo(json, "weight", validateSchedulerInfo(json, "weight",
new ExpectedQueueWithProperties("root", new ExpectedQueueWithProperties("root",
EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE), EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE,
new ExpectedQueueWithProperties("root.default", 10.0f, 0.5f), STATIC_PARENT),
new ExpectedQueueWithProperties("root.test1", 4.0f, 0.2f), new ExpectedQueueWithProperties("root.default", 10.0f, 0.5f,
new ExpectedQueueWithProperties("root.test2", 6.0f, 0.3f)); STATIC_LEAF),
new ExpectedQueueWithProperties("root.test1", 4.0f, 0.2f,
STATIC_LEAF),
new ExpectedQueueWithProperties("root.test2", 6.0f, 0.3f,
STATIC_LEAF));
}
@Test
public void testSchedulerResponseWeightModeWithAutoCreatedQueues()
throws Exception {
Configuration config = CSConfigGenerator
.createWeightConfigWithAutoQueueCreationEnabled();
config.set(YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
YarnConfiguration.MEMORY_CONFIGURATION_STORE);
initResourceManager(config);
initAutoQueueHandler();
JSONObject json = sendRequestToSchedulerEndpoint();
validateSchedulerInfo(json, "weight",
new ExpectedQueueWithProperties("root",
EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE,
STATIC_PARENT),
new ExpectedQueueWithProperties("root.default", 10.0f, 0.5f,
STATIC_LEAF),
new ExpectedQueueWithProperties("root.test1", 4.0f, 0.2f,
STATIC_LEAF),
new ExpectedQueueWithProperties("root.test2", 6.0f, 0.3f,
STATIC_LEAF));
//Now create some auto created queues
createQueue("root.auto1");
createQueue("root.auto2");
createQueue("root.auto3");
createQueue("root.autoParent1.auto4");
json = sendRequestToSchedulerEndpoint();
//root.auto1=1w, root.auto2=1w, root.auto3=1w
//root.default=10w, root.test1=4w, root.test2=6w
//root.autoparent1=1w
int sumOfWeights = 24;
ExpectedQueueWithProperties expectedRootQ =
new ExpectedQueueWithProperties("root",
EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE,
STATIC_PARENT);
validateSchedulerInfo(json, "weight",
expectedRootQ,
new ExpectedQueueWithProperties("root.auto1",
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights,
AUTO_CREATED_LEAF),
new ExpectedQueueWithProperties("root.auto2",
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights,
AUTO_CREATED_LEAF),
new ExpectedQueueWithProperties("root.auto3",
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights,
AUTO_CREATED_LEAF),
new ExpectedQueueWithProperties("root.autoParent1",
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights,
AUTO_CREATED_PARENT),
new ExpectedQueueWithProperties("root.default", 10.0f,
10.0f / sumOfWeights,
STATIC_LEAF),
new ExpectedQueueWithProperties("root.test1", 4.0f,
4.0f / sumOfWeights,
STATIC_LEAF),
new ExpectedQueueWithProperties("root.test2", 6.0f,
6.0f / sumOfWeights,
STATIC_LEAF));
validateChildrenOfParent(json, "root.autoParent1", "weight",
expectedRootQ,
new ExpectedQueueWithProperties("root.autoParent1.auto4",
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
AUTO_CREATED_LEAF));
}
private void initAutoQueueHandler() throws Exception {
CapacityScheduler cs = (CapacityScheduler) RM.getResourceScheduler();
autoQueueHandler = new CapacitySchedulerAutoQueueHandler(
cs.getCapacitySchedulerQueueManager());
MockNM nm1 = RM.registerNode("h1:1234", 1200 * GB); // label = x
}
private LeafQueue createQueue(String queuePath) throws YarnException {
return autoQueueHandler.autoCreateQueue(
CSQueueUtils.extractQueuePath(queuePath));
} }
private JSONObject sendRequestToSchedulerEndpoint() throws Exception { private JSONObject sendRequestToSchedulerEndpoint() throws Exception {
@ -206,44 +320,103 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
private void validateSchedulerInfo(JSONObject json, String expectedMode, private void validateSchedulerInfo(JSONObject json, String expectedMode,
ExpectedQueueWithProperties rootQueue, ExpectedQueueWithProperties rootQueue,
ExpectedQueueWithProperties... expectedQueues) throws JSONException { ExpectedQueueWithProperties... expectedQueues) throws JSONException {
Map<String, ExpectedQueueWithProperties> queuesMap = new HashMap<>();
for (ExpectedQueueWithProperties expectedQueue : expectedQueues) {
queuesMap.put(expectedQueue.path, expectedQueue);
}
int expectedQSize = expectedQueues.length;
Assert.assertNotNull("SchedulerTypeInfo should not be null", json); Assert.assertNotNull("SchedulerTypeInfo should not be null", json);
assertEquals("incorrect number of elements in: " + json, 1, json.length()); assertEquals("incorrect number of elements in: " + json, 1, json.length());
JSONObject info = verifySchedulerJSONObject(json);
info = verifySchedulerInfoJSONObject(expectedMode, rootQueue, info);
JSONArray queueArray = verifyQueueJSONListObject(info,
expectedQueues.length);
verifyQueues(CapacitySchedulerConfiguration.ROOT, expectedMode,
queueArray, expectedQueues);
}
private void validateChildrenOfParent(JSONObject json,
String parentPath, String expectedMode,
ExpectedQueueWithProperties rootQueue,
ExpectedQueueWithProperties... expectedLeafQueues) throws JSONException {
Assert.assertNotNull("SchedulerTypeInfo should not be null", json);
assertEquals("incorrect number of elements in: " + json, 1, json.length());
JSONObject info = verifySchedulerJSONObject(json);
info = verifySchedulerInfoJSONObject(expectedMode, rootQueue, info);
JSONArray queueArray = getQueuesJSONListObject(info);
Set<String> verifiedQueues = new HashSet<>();
for (int i = 0; i < queueArray.length(); i++) {
JSONObject childQueueObj = queueArray.getJSONObject(i);
String queuePath = CapacitySchedulerConfiguration.ROOT + "." +
childQueueObj.getString("queueName");
if (queuePath.equals(parentPath)) {
JSONArray childQueueArray = verifyQueueJSONListObject(childQueueObj,
expectedLeafQueues.length);
verifyQueues(parentPath, expectedMode, childQueueArray,
expectedLeafQueues);
verifiedQueues.add(queuePath);
}
}
Assert.assertEquals("Not all child queues were found. " +
String.format("Found queues: %s, All queues: %s", verifiedQueues,
Arrays.stream(expectedLeafQueues).map(lq -> lq.path)
.collect(Collectors.toList())),
expectedLeafQueues.length, verifiedQueues.size());
}
private JSONObject verifySchedulerJSONObject(JSONObject json)
throws JSONException {
JSONObject info = json.getJSONObject("scheduler"); JSONObject info = json.getJSONObject("scheduler");
Assert.assertNotNull("Scheduler object should not be null", json); Assert.assertNotNull("Scheduler object should not be null", json);
assertEquals("incorrect number of elements in: " + info, 1, info.length()); assertEquals("incorrect number of elements in: " + info, 1, info.length());
return info;
}
private JSONObject verifySchedulerInfoJSONObject(String expectedMode,
ExpectedQueueWithProperties rootQueue, JSONObject info)
throws JSONException {
//Validate if root queue has the expected mode and weight values //Validate if root queue has the expected mode and weight values
info = info.getJSONObject("schedulerInfo"); info = info.getJSONObject("schedulerInfo");
Assert.assertNotNull("SchedulerInfo should not be null", info); Assert.assertNotNull("SchedulerInfo should not be null", info);
Assert.assertEquals("Expected Queue mode " +expectedMode, expectedMode, Assert.assertEquals("Expected Queue mode " + expectedMode, expectedMode,
info.getString("mode")); info.getString("mode"));
Assert.assertEquals(rootQueue.weight, Assert.assertEquals(rootQueue.weight,
Float.parseFloat(info.getString("weight")), DELTA); Float.parseFloat(info.getString("weight")), DELTA);
Assert.assertEquals(rootQueue.normalizedWeight, Assert.assertEquals(rootQueue.normalizedWeight,
Float.parseFloat(info.getString("normalizedWeight")), DELTA); Float.parseFloat(info.getString("normalizedWeight")), DELTA);
return info;
}
private JSONArray verifyQueueJSONListObject(JSONObject info,
int expectedQSize) throws JSONException {
JSONArray queueArray = getQueuesJSONListObject(info);
assertEquals("QueueInfoList should be size of " + expectedQSize,
expectedQSize, queueArray.length());
return queueArray;
}
private JSONArray getQueuesJSONListObject(JSONObject info)
throws JSONException {
JSONObject queuesObj = info.getJSONObject("queues"); JSONObject queuesObj = info.getJSONObject("queues");
Assert.assertNotNull("QueueInfoList should not be null", queuesObj); Assert.assertNotNull("QueueInfoList should not be null", queuesObj);
JSONArray queueArray = queuesObj.getJSONArray("queue"); JSONArray queueArray = queuesObj.getJSONArray("queue");
Assert.assertNotNull("Queue list should not be null", queueArray); Assert.assertNotNull("Queue list should not be null", queueArray);
assertEquals("QueueInfoList should be size of " + expectedQSize, return queueArray;
expectedQSize, queueArray.length()); }
private void verifyQueues(String parentPath, String expectedMode,
JSONArray queueArray, ExpectedQueueWithProperties[] expectedQueues)
throws JSONException {
Map<String, ExpectedQueueWithProperties> queuesMap = new HashMap<>();
for (ExpectedQueueWithProperties expectedQueue : expectedQueues) {
queuesMap.put(expectedQueue.path, expectedQueue);
}
// Create mapping of queue path -> mode // Create mapping of queue path -> mode
Map<String, String> modesMap = new HashMap<>(); Map<String, String> modesMap = new HashMap<>();
for (int i = 0; i < queueArray.length(); i++) { for (int i = 0; i < queueArray.length(); i++) {
JSONObject obj = queueArray.getJSONObject(i); JSONObject obj = queueArray.getJSONObject(i);
String queuePath = CapacitySchedulerConfiguration.ROOT + "." + String queuePath = parentPath + "." + obj.getString("queueName");
obj.getString("queueName");
String mode = obj.getString("mode"); String mode = obj.getString("mode");
modesMap.put(queuePath, mode); modesMap.put(queuePath, mode);
@ -254,9 +427,15 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
Assert.assertEquals("Weight value does not match", Assert.assertEquals("Weight value does not match",
expectedQueue.weight, Float.parseFloat(obj.getString("weight")), expectedQueue.weight, Float.parseFloat(obj.getString("weight")),
DELTA); DELTA);
Assert.assertEquals("Normalized weight value does not match", Assert.assertEquals("Normalized weight value does not match for queue " +
queuePath,
expectedQueue.normalizedWeight, expectedQueue.normalizedWeight,
Float.parseFloat(obj.getString("normalizedWeight")), DELTA); Float.parseFloat(obj.getString("normalizedWeight")), DELTA);
//validate queue creation type
Assert.assertEquals("Queue creation type does not match for queue " +
queuePath,
expectedQueue.queueType, obj.getString("queueType"));
} }
//Validate queue paths and modes //Validate queue paths and modes
@ -308,6 +487,14 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
} }
public static Configuration createWeightConfig() { public static Configuration createWeightConfig() {
return createWeightConfigInternal(false);
}
public static Configuration createWeightConfigWithAutoQueueCreationEnabled() {
return createWeightConfigInternal(true);
}
private static Configuration createWeightConfigInternal(boolean enableAqc) {
Map<String, String> conf = new HashMap<>(); Map<String, String> conf = new HashMap<>();
conf.put("yarn.scheduler.capacity.root.queues", "default, test1, test2"); conf.put("yarn.scheduler.capacity.root.queues", "default, test1, test2");
conf.put("yarn.scheduler.capacity.root.capacity", "1w"); conf.put("yarn.scheduler.capacity.root.capacity", "1w");
@ -316,6 +503,13 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
conf.put("yarn.scheduler.capacity.root.test2.capacity", "6w"); conf.put("yarn.scheduler.capacity.root.test2.capacity", "6w");
conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING"); conf.put("yarn.scheduler.capacity.root.test1.state", "RUNNING");
conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING"); conf.put("yarn.scheduler.capacity.root.test2.state", "RUNNING");
if (enableAqc) {
conf.put("yarn.scheduler.capacity.root.auto-queue-creation-v2.enabled",
"true");
conf.put("yarn.scheduler.capacity.root.default." +
"auto-queue-creation-v2.enabled", "true");
}
return createConfiguration(conf); return createConfiguration(conf);
} }

View File

@ -574,7 +574,7 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
JSONObject info = json.getJSONObject("scheduler"); JSONObject info = json.getJSONObject("scheduler");
assertEquals("incorrect number of elements", 1, info.length()); assertEquals("incorrect number of elements", 1, info.length());
info = info.getJSONObject("schedulerInfo"); info = info.getJSONObject("schedulerInfo");
assertEquals("incorrect number of elements", 15, info.length()); assertEquals("incorrect number of elements", 16, info.length());
JSONObject capacitiesJsonObject = info.getJSONObject(CAPACITIES); JSONObject capacitiesJsonObject = info.getJSONObject(CAPACITIES);
JSONArray partitionsCapsArray = JSONArray partitionsCapsArray =
capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION); capacitiesJsonObject.getJSONArray(QUEUE_CAPACITIES_BY_PARTITION);