YARN-11031. Improve the maintainability of RM webapp tests like TestRMWebServicesCapacitySched. Contributed by Tamas Domok
This commit is contained in:
parent
1364847fdd
commit
19430118c6
|
@ -455,6 +455,8 @@
|
|||
during build time -->
|
||||
<exclude>src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/schema/MappingRulesDescription.java</exclude>
|
||||
<exclude>src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/placement/schema/Rule.java</exclude>
|
||||
<exclude>src/test/resources/webapp/*.json</exclude>
|
||||
<exclude>src/test/resources/webapp/*.xml</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
|
|
|
@ -24,9 +24,6 @@ import javax.xml.bind.annotation.XmlRootElement;
|
|||
import javax.xml.bind.annotation.XmlTransient;
|
||||
import javax.xml.bind.annotation.XmlType;
|
||||
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
|
@ -35,9 +32,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQu
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper.CapacitySchedulerInfoHelper;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Map;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo.getSortedQueueAclInfoList;
|
||||
|
||||
@XmlRootElement(name = "capacityScheduler")
|
||||
@XmlType(name = "capacityScheduler")
|
||||
@XmlAccessorType(XmlAccessType.FIELD)
|
||||
|
@ -94,21 +92,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
|
|||
|
||||
CapacitySchedulerConfiguration conf = cs.getConfiguration();
|
||||
queueAcls = new QueueAclsInfo();
|
||||
for (Map.Entry<AccessType, AccessControlList> e : conf
|
||||
.getAcls(queueName).entrySet()) {
|
||||
QueueAclInfo queueAcl = new QueueAclInfo(e.getKey().toString(),
|
||||
e.getValue().getAclString());
|
||||
queueAcls.add(queueAcl);
|
||||
}
|
||||
|
||||
String aclApplicationMaxPriority = "acl_" +
|
||||
StringUtils.toLowerCase(AccessType.APPLICATION_MAX_PRIORITY.toString());
|
||||
String priorityAcls = conf.get(parent.getQueuePath()
|
||||
+ aclApplicationMaxPriority, conf.ALL_ACL);
|
||||
|
||||
QueueAclInfo queueAcl = new QueueAclInfo(
|
||||
AccessType.APPLICATION_MAX_PRIORITY.toString(), priorityAcls);
|
||||
queueAcls.add(queueAcl);
|
||||
queueAcls.addAll(getSortedQueueAclInfoList(queueName, conf));
|
||||
|
||||
queuePriority = parent.getPriority().getPriority();
|
||||
if (parent instanceof ParentQueue) {
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
|
@ -160,21 +161,7 @@ public class CapacitySchedulerQueueInfo {
|
|||
|
||||
CapacitySchedulerConfiguration conf = cs.getConfiguration();
|
||||
queueAcls = new QueueAclsInfo();
|
||||
for (Map.Entry<AccessType, AccessControlList> e : conf
|
||||
.getAcls(queuePath).entrySet()) {
|
||||
QueueAclInfo queueAcl = new QueueAclInfo(e.getKey().toString(),
|
||||
e.getValue().getAclString());
|
||||
queueAcls.add(queueAcl);
|
||||
}
|
||||
|
||||
String aclApplicationMaxPriority = "acl_" +
|
||||
StringUtils.toLowerCase(AccessType.APPLICATION_MAX_PRIORITY.toString());
|
||||
String priorityAcls = conf.get(CapacitySchedulerConfiguration
|
||||
.getQueuePrefix(queuePath) + aclApplicationMaxPriority, conf.ALL_ACL);
|
||||
|
||||
QueueAclInfo queueAcl = new QueueAclInfo(
|
||||
AccessType.APPLICATION_MAX_PRIORITY.toString(), priorityAcls);
|
||||
queueAcls.add(queueAcl);
|
||||
queueAcls.addAll(getSortedQueueAclInfoList(queuePath, conf));
|
||||
|
||||
queuePriority = q.getPriority().getPriority();
|
||||
if (q instanceof ParentQueue) {
|
||||
|
@ -202,6 +189,29 @@ public class CapacitySchedulerQueueInfo {
|
|||
leafQueueTemplate = new LeafQueueTemplateInfo(conf, queuePath);
|
||||
}
|
||||
|
||||
public static ArrayList<QueueAclInfo> getSortedQueueAclInfoList(String queuePath,
|
||||
CapacitySchedulerConfiguration conf) {
|
||||
ArrayList<QueueAclInfo> queueAclsInfo = new ArrayList<>();
|
||||
for (Map.Entry<AccessType, AccessControlList> e : conf
|
||||
.getAcls(queuePath).entrySet()) {
|
||||
QueueAclInfo queueAcl = new QueueAclInfo(e.getKey().toString(),
|
||||
e.getValue().getAclString());
|
||||
queueAclsInfo.add(queueAcl);
|
||||
}
|
||||
|
||||
String aclApplicationMaxPriority = "acl_" +
|
||||
StringUtils.toLowerCase(AccessType.APPLICATION_MAX_PRIORITY.toString());
|
||||
String priorityAcls = conf.get(CapacitySchedulerConfiguration
|
||||
.getQueuePrefix(queuePath) + aclApplicationMaxPriority,
|
||||
CapacitySchedulerConfiguration.ALL_ACL);
|
||||
|
||||
QueueAclInfo queueAcl = new QueueAclInfo(
|
||||
AccessType.APPLICATION_MAX_PRIORITY.toString(), priorityAcls);
|
||||
queueAclsInfo.add(queueAcl);
|
||||
queueAclsInfo.sort(Comparator.comparing(QueueAclInfo::getAccessType));
|
||||
return queueAclsInfo;
|
||||
}
|
||||
|
||||
protected void populateQueueResourceUsage(ResourceUsage queueResourceUsage) {
|
||||
resources = new ResourcesInfo(queueResourceUsage, false);
|
||||
}
|
||||
|
|
|
@ -18,24 +18,34 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAX_PARALLEL_APPLICATIONS;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.CAPACITY;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.StringReader;
|
||||
import java.io.StringWriter;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.transform.OutputKeys;
|
||||
import javax.xml.transform.Transformer;
|
||||
import javax.xml.transform.TransformerException;
|
||||
import javax.xml.transform.TransformerFactory;
|
||||
import javax.xml.transform.dom.DOMSource;
|
||||
import javax.xml.transform.stream.StreamResult;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.JettyUtils;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
|
@ -50,63 +60,24 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||
import org.apache.hadoop.yarn.webapp.WebServicesTestUtils;
|
||||
import org.codehaus.jettison.json.JSONArray;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.Node;
|
||||
import org.w3c.dom.NodeList;
|
||||
import org.xml.sax.InputSource;
|
||||
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.CAPACITY;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAX_PARALLEL_APPLICATIONS;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
||||
|
||||
protected static MockRM rm;
|
||||
protected static CapacitySchedulerConfiguration csConf;
|
||||
protected static YarnConfiguration conf;
|
||||
|
||||
private class QueueInfo {
|
||||
float capacity;
|
||||
float usedCapacity;
|
||||
float maxCapacity;
|
||||
float absoluteCapacity;
|
||||
float absoluteMaxCapacity;
|
||||
float absoluteUsedCapacity;
|
||||
int numApplications;
|
||||
int maxParallelApps;
|
||||
String queueName;
|
||||
private String queuePath;
|
||||
String state;
|
||||
boolean isAbsoluteResource;
|
||||
boolean autoCreateChildQueueEnabled;
|
||||
|
||||
public String getQueuePath() {
|
||||
return queuePath;
|
||||
}
|
||||
}
|
||||
|
||||
private class LeafQueueInfo extends QueueInfo {
|
||||
int numActiveApplications;
|
||||
int numPendingApplications;
|
||||
int numContainers;
|
||||
int maxApplications;
|
||||
int maxApplicationsPerUser;
|
||||
float userLimit;
|
||||
float userLimitFactor;
|
||||
long defaultApplicationLifetime;
|
||||
long maxApplicationLifetime;
|
||||
}
|
||||
|
||||
private static class WebServletModule extends ServletModule {
|
||||
@Override
|
||||
|
@ -114,11 +85,12 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
bind(JAXBContextResolver.class);
|
||||
bind(RMWebServices.class);
|
||||
bind(GenericExceptionHandler.class);
|
||||
csConf = new CapacitySchedulerConfiguration();
|
||||
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(
|
||||
new Configuration(false));
|
||||
setupQueueConfiguration(csConf);
|
||||
conf = new YarnConfiguration(csConf);
|
||||
YarnConfiguration conf = new YarnConfiguration(csConf);
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
ResourceScheduler.class);
|
||||
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
||||
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
||||
rm = new MockRM(conf);
|
||||
|
@ -127,74 +99,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
static {
|
||||
GuiceServletConfig.setInjector(
|
||||
Guice.createInjector(new WebServletModule()));
|
||||
}
|
||||
|
||||
private static void setupQueueConfiguration(
|
||||
CapacitySchedulerConfiguration config) {
|
||||
|
||||
// Define top-level queues
|
||||
config.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] {"a", "b"});
|
||||
|
||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
config.setCapacity(A, 10.5f);
|
||||
config.setMaximumCapacity(A, 50);
|
||||
config.setInt(CapacitySchedulerConfiguration.getQueuePrefix(A) + MAX_PARALLEL_APPLICATIONS, 42);
|
||||
|
||||
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||
config.setCapacity(B, 89.5f);
|
||||
|
||||
final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||
config.setCapacity(C, "[memory=1024]");
|
||||
|
||||
// Define 2nd-level queues
|
||||
final String A1 = A + ".a1";
|
||||
final String A2 = A + ".a2";
|
||||
config.setQueues(A, new String[] {"a1", "a2"});
|
||||
config.setCapacity(A1, 30);
|
||||
config.setMaximumCapacity(A1, 50);
|
||||
config.setMaximumLifetimePerQueue(A2, 100);
|
||||
config.setDefaultLifetimePerQueue(A2, 50);
|
||||
|
||||
config.setUserLimitFactor(A1, 100.0f);
|
||||
config.setCapacity(A2, 70);
|
||||
config.setUserLimitFactor(A2, 100.0f);
|
||||
|
||||
final String B1 = B + ".b1";
|
||||
final String B2 = B + ".b2";
|
||||
final String B3 = B + ".b3";
|
||||
config.setQueues(B, new String[] {"b1", "b2", "b3"});
|
||||
config.setCapacity(B1, 60);
|
||||
config.setUserLimitFactor(B1, 100.0f);
|
||||
config.setCapacity(B2, 39.5f);
|
||||
config.setUserLimitFactor(B2, 100.0f);
|
||||
config.setCapacity(B3, 0.5f);
|
||||
config.setUserLimitFactor(B3, 100.0f);
|
||||
|
||||
config.setQueues(A1, new String[] {"a1a", "a1b", "a1c"});
|
||||
final String A1A = A1 + ".a1a";
|
||||
config.setCapacity(A1A, 65);
|
||||
final String A1B = A1 + ".a1b";
|
||||
config.setCapacity(A1B, 15);
|
||||
final String A1C = A1 + ".a1c";
|
||||
config.setCapacity(A1C, 20);
|
||||
|
||||
config.setAutoCreateChildQueueEnabled(A1C, true);
|
||||
config.setInt(PREFIX + A1C + DOT + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX
|
||||
+ DOT + CAPACITY, 50);
|
||||
}
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
GuiceServletConfig.setInjector(
|
||||
Guice.createInjector(new WebServletModule()));
|
||||
}
|
||||
|
||||
public TestRMWebServicesCapacitySched() {
|
||||
super(new WebAppDescriptor.Builder(
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.webapp")
|
||||
|
@ -203,399 +107,103 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
.contextPath("jersey-guice-filter").servletPath("/").build());
|
||||
}
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
GuiceServletConfig.setInjector(
|
||||
Guice.createInjector(new WebServletModule()));
|
||||
}
|
||||
|
||||
public static void setupQueueConfiguration(
|
||||
CapacitySchedulerConfiguration config) {
|
||||
|
||||
// Define top-level queues
|
||||
config.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] {"a", "b"});
|
||||
|
||||
final String a = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
config.setCapacity(a, 10.5f);
|
||||
config.setMaximumCapacity(a, 50);
|
||||
config.setInt(CapacitySchedulerConfiguration.getQueuePrefix(a) + MAX_PARALLEL_APPLICATIONS, 42);
|
||||
|
||||
final String b = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||
config.setCapacity(b, 89.5f);
|
||||
|
||||
final String c = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||
config.setCapacity(c, "[memory=1024]");
|
||||
|
||||
// Define 2nd-level queues
|
||||
final String a1 = a + ".a1";
|
||||
final String a2 = a + ".a2";
|
||||
config.setQueues(a, new String[] {"a1", "a2"});
|
||||
config.setCapacity(a1, 30);
|
||||
config.setMaximumCapacity(a1, 50);
|
||||
config.setMaximumLifetimePerQueue(a2, 100);
|
||||
config.setDefaultLifetimePerQueue(a2, 50);
|
||||
|
||||
config.setUserLimitFactor(a1, 100.0f);
|
||||
config.setCapacity(a2, 70);
|
||||
config.setUserLimitFactor(a2, 100.0f);
|
||||
|
||||
final String b1 = b + ".b1";
|
||||
final String b2 = b + ".b2";
|
||||
final String b3 = b + ".b3";
|
||||
config.setQueues(b, new String[] {"b1", "b2", "b3"});
|
||||
config.setCapacity(b1, 60);
|
||||
config.setUserLimitFactor(b1, 100.0f);
|
||||
config.setCapacity(b2, 39.5f);
|
||||
config.setUserLimitFactor(b2, 100.0f);
|
||||
config.setCapacity(b3, 0.5f);
|
||||
config.setUserLimitFactor(b3, 100.0f);
|
||||
|
||||
config.setQueues(a1, new String[] {"a1a", "a1b", "a1c"});
|
||||
final String a1A = a1 + ".a1a";
|
||||
config.setCapacity(a1A, 65);
|
||||
final String a1B = a1 + ".a1b";
|
||||
config.setCapacity(a1B, 15);
|
||||
final String a1C = a1 + ".a1c";
|
||||
config.setCapacity(a1C, 20);
|
||||
|
||||
config.setAutoCreateChildQueueEnabled(a1C, true);
|
||||
config.setInt(PREFIX + a1C + DOT + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX
|
||||
+ DOT + CAPACITY, 50);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterScheduler() throws JSONException, Exception {
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
public void testClusterScheduler() throws Exception {
|
||||
ClientResponse response = resource().path("ws").path("v1").path("cluster")
|
||||
.path("scheduler").accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
verifyClusterScheduler(json);
|
||||
assertJsonResponse(response, "webapp/scheduler-response.json");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterSchedulerSlash() throws JSONException, Exception {
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
public void testClusterSchedulerSlash() throws Exception {
|
||||
ClientResponse response = resource().path("ws").path("v1").path("cluster")
|
||||
.path("scheduler/").accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
verifyClusterScheduler(json);
|
||||
assertJsonResponse(response, "webapp/scheduler-response.json");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterSchedulerDefault() throws JSONException, Exception {
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
public void testClusterSchedulerDefault() throws Exception {
|
||||
ClientResponse response = resource().path("ws").path("v1").path("cluster")
|
||||
.path("scheduler").get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
verifyClusterScheduler(json);
|
||||
assertJsonResponse(response, "webapp/scheduler-response.json");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClusterSchedulerXML() throws JSONException, Exception {
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
public void testClusterSchedulerXML() throws Exception {
|
||||
ClientResponse response = resource().path("ws").path("v1").path("cluster")
|
||||
.path("scheduler/").accept(MediaType.APPLICATION_XML)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_XML_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
String xml = response.getEntity(String.class);
|
||||
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
|
||||
DocumentBuilder db = dbf.newDocumentBuilder();
|
||||
InputSource is = new InputSource();
|
||||
is.setCharacterStream(new StringReader(xml));
|
||||
Document dom = db.parse(is);
|
||||
NodeList scheduler = dom.getElementsByTagName("scheduler");
|
||||
assertEquals("incorrect number of elements", 1, scheduler.getLength());
|
||||
NodeList schedulerInfo = dom.getElementsByTagName("schedulerInfo");
|
||||
assertEquals("incorrect number of elements", 1, schedulerInfo.getLength());
|
||||
verifyClusterSchedulerXML(schedulerInfo);
|
||||
assertXmlResponse(response, "webapp/scheduler-response.xml");
|
||||
}
|
||||
|
||||
public void verifyClusterSchedulerXML(NodeList nodes) throws Exception {
|
||||
|
||||
for (int i = 0; i < nodes.getLength(); i++) {
|
||||
Element element = (Element) nodes.item(i);
|
||||
|
||||
verifyClusterSchedulerGeneric(
|
||||
WebServicesTestUtils.getXmlAttrString(element, "xsi:type"),
|
||||
WebServicesTestUtils.getXmlFloat(element, "usedCapacity"),
|
||||
WebServicesTestUtils.getXmlFloat(element, "capacity"),
|
||||
WebServicesTestUtils.getXmlFloat(element, "maxCapacity"),
|
||||
WebServicesTestUtils.getXmlString(element, "queueName"),
|
||||
WebServicesTestUtils.getXmlString(element, "queuePath"),
|
||||
WebServicesTestUtils.getXmlInt(element, "maxParallelApps"));
|
||||
|
||||
NodeList children = element.getChildNodes();
|
||||
for (int j = 0; j < children.getLength(); j++) {
|
||||
Element qElem = (Element) children.item(j);
|
||||
if(qElem.getTagName().equals("queues")) {
|
||||
NodeList qListInfos = qElem.getChildNodes();
|
||||
for (int k = 0; k < qListInfos.getLength(); k++) {
|
||||
Element qElem2 = (Element) qListInfos.item(k);
|
||||
String qName2 = WebServicesTestUtils.getXmlString(qElem2, "queueName");
|
||||
String q2 = CapacitySchedulerConfiguration.ROOT + "." + qName2;
|
||||
verifySubQueueXML(qElem2, q2, 100, 100);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void verifySubQueueXML(Element qElem, String q,
|
||||
float parentAbsCapacity, float parentAbsMaxCapacity)
|
||||
throws Exception {
|
||||
NodeList children = qElem.getChildNodes();
|
||||
boolean hasSubQueues = false;
|
||||
for (int j = 0; j < children.getLength(); j++) {
|
||||
Element qElem2 = (Element) children.item(j);
|
||||
if(qElem2.getTagName().equals("queues")) {
|
||||
NodeList qListInfos = qElem2.getChildNodes();
|
||||
if (qListInfos.getLength() > 0) {
|
||||
hasSubQueues = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
QueueInfo qi = (hasSubQueues) ? new QueueInfo() : new LeafQueueInfo();
|
||||
qi.capacity = WebServicesTestUtils.getXmlFloat(qElem, "capacity");
|
||||
qi.usedCapacity =
|
||||
WebServicesTestUtils.getXmlFloat(qElem, "usedCapacity");
|
||||
qi.maxCapacity = WebServicesTestUtils.getXmlFloat(qElem, "maxCapacity");
|
||||
qi.absoluteCapacity = WebServicesTestUtils.getXmlFloat(qElem, "absoluteCapacity");
|
||||
qi.absoluteMaxCapacity =
|
||||
WebServicesTestUtils.getXmlFloat(qElem, "absoluteMaxCapacity");
|
||||
qi.absoluteUsedCapacity =
|
||||
WebServicesTestUtils.getXmlFloat(qElem, "absoluteUsedCapacity");
|
||||
qi.numApplications =
|
||||
WebServicesTestUtils.getXmlInt(qElem, "numApplications");
|
||||
qi.maxParallelApps =
|
||||
WebServicesTestUtils.getXmlInt(qElem, "maxParallelApps");
|
||||
qi.queueName = WebServicesTestUtils.getXmlString(qElem, "queueName");
|
||||
qi.queuePath = WebServicesTestUtils.getXmlString(qElem, "queuePath");
|
||||
qi.state = WebServicesTestUtils.getXmlString(qElem, "state");
|
||||
qi.autoCreateChildQueueEnabled = WebServicesTestUtils.getXmlBoolean(qElem,
|
||||
"autoCreateChildQueueEnabled");
|
||||
qi.isAbsoluteResource = WebServicesTestUtils.getXmlBoolean(qElem,
|
||||
"isAbsoluteResource");
|
||||
verifySubQueueGeneric(q, qi, parentAbsCapacity, parentAbsMaxCapacity);
|
||||
if (hasSubQueues) {
|
||||
for (int j = 0; j < children.getLength(); j++) {
|
||||
Element qElem2 = (Element) children.item(j);
|
||||
if(qElem2.getTagName().equals("queues")) {
|
||||
NodeList qListInfos = qElem2.getChildNodes();
|
||||
for (int k = 0; k < qListInfos.getLength(); k++) {
|
||||
Element qElem3 = (Element) qListInfos.item(k);
|
||||
String qName3 = WebServicesTestUtils.getXmlString(qElem3, "queueName");
|
||||
String q3 = q + "." + qName3;
|
||||
verifySubQueueXML(qElem3, q3, qi.absoluteCapacity, qi.absoluteMaxCapacity);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (qi.autoCreateChildQueueEnabled) {
|
||||
assertEquals("queueName doesn't match", "a1c", qi.queueName);
|
||||
String capacityStr = WebServicesTestUtils.getPropertyValue(qElem,
|
||||
"leafQueueTemplate", AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX
|
||||
+ DOT + CAPACITY);
|
||||
int capacity = Integer.parseInt(capacityStr);
|
||||
assertEquals(AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX + DOT
|
||||
+ CAPACITY + " doesn't match", 50, capacity);
|
||||
} else {
|
||||
LeafQueueInfo lqi = (LeafQueueInfo) qi;
|
||||
lqi.numActiveApplications =
|
||||
WebServicesTestUtils.getXmlInt(qElem, "numActiveApplications");
|
||||
lqi.numPendingApplications =
|
||||
WebServicesTestUtils.getXmlInt(qElem, "numPendingApplications");
|
||||
lqi.numContainers =
|
||||
WebServicesTestUtils.getXmlInt(qElem, "numContainers");
|
||||
lqi.maxApplications =
|
||||
WebServicesTestUtils.getXmlInt(qElem, "maxApplications");
|
||||
lqi.maxApplicationsPerUser =
|
||||
WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser");
|
||||
lqi.userLimit = WebServicesTestUtils.getXmlFloat(qElem, "userLimit");
|
||||
lqi.userLimitFactor =
|
||||
WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
|
||||
lqi.defaultApplicationLifetime =
|
||||
WebServicesTestUtils.getXmlLong(qElem, "defaultApplicationLifetime");
|
||||
lqi.maxApplicationLifetime =
|
||||
WebServicesTestUtils.getXmlLong(qElem, "maxApplicationLifetime");
|
||||
verifyLeafQueueGeneric(q, lqi);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyClusterScheduler(JSONObject json) throws JSONException,
|
||||
Exception {
|
||||
assertEquals("incorrect number of elements in: " + json, 1, json.length());
|
||||
JSONObject info = json.getJSONObject("scheduler");
|
||||
assertEquals("incorrect number of elements in: " + info, 1, info.length());
|
||||
info = info.getJSONObject("schedulerInfo");
|
||||
assertEquals("incorrect number of elements in: " + info, 23, info.length());
|
||||
verifyClusterSchedulerGeneric(info.getString("type"),
|
||||
(float) info.getDouble("usedCapacity"),
|
||||
(float) info.getDouble("capacity"),
|
||||
(float) info.getDouble("maxCapacity"),
|
||||
info.getString("queueName"),
|
||||
info.getString("queuePath"),
|
||||
info.getInt("maxParallelApps"));
|
||||
JSONObject health = info.getJSONObject("health");
|
||||
assertNotNull(health);
|
||||
assertEquals("incorrect number of elements in: " + health, 3,
|
||||
health.length());
|
||||
JSONArray operationsInfo = health.getJSONArray("operationsInfo");
|
||||
assertEquals("incorrect number of elements in: " + health, 4,
|
||||
operationsInfo.length());
|
||||
JSONArray lastRunDetails = health.getJSONArray("lastRunDetails");
|
||||
assertEquals("incorrect number of elements in: " + health, 3,
|
||||
lastRunDetails.length());
|
||||
|
||||
JSONObject maximumAllocation = info.getJSONObject("maximumAllocation");
|
||||
assertEquals("8192", maximumAllocation.getString("memory"));
|
||||
assertEquals("4", maximumAllocation.getString("vCores"));
|
||||
|
||||
JSONObject queueAcls = info.getJSONObject("queueAcls");
|
||||
assertEquals(1, queueAcls.length());
|
||||
|
||||
assertEquals("0", info.getString("queuePriority"));
|
||||
assertEquals("utilization", info.getString("orderingPolicyInfo"));
|
||||
|
||||
JSONArray arr = info.getJSONObject("queues").getJSONArray("queue");
|
||||
assertEquals("incorrect number of elements in: " + arr, 2, arr.length());
|
||||
|
||||
// test subqueues
|
||||
for (int i = 0; i < arr.length(); i++) {
|
||||
JSONObject obj = arr.getJSONObject(i);
|
||||
String q = CapacitySchedulerConfiguration.ROOT + "." +
|
||||
obj.getString("queueName");
|
||||
verifySubQueue(obj, q, 100, 100);
|
||||
}
|
||||
}
|
||||
|
||||
private void verifyClusterSchedulerGeneric(String type, float usedCapacity,
|
||||
float capacity, float maxCapacity, String queueName, String queuePath, int maxParallelApps)
|
||||
throws Exception {
|
||||
|
||||
assertTrue("type doesn't match", "capacityScheduler".matches(type));
|
||||
assertEquals("usedCapacity doesn't match", 0, usedCapacity, 1e-3f);
|
||||
assertEquals("capacity doesn't match", 100, capacity, 1e-3f);
|
||||
assertEquals("maxCapacity doesn't match", 100, maxCapacity, 1e-3f);
|
||||
assertTrue("queueName doesn't match", "root".matches(queueName));
|
||||
assertTrue("queuePath doesn't match", "root".matches(queuePath));
|
||||
assertEquals("maxParallelApps doesn't match ", Integer.MAX_VALUE, maxParallelApps);
|
||||
}
|
||||
|
||||
private void verifySubQueue(JSONObject info, String q,
|
||||
float parentAbsCapacity, float parentAbsMaxCapacity)
|
||||
throws JSONException, Exception {
|
||||
int numExpectedElements = 38;
|
||||
boolean isParentQueue = true;
|
||||
if (!info.has("queues")) {
|
||||
numExpectedElements = 56;
|
||||
isParentQueue = false;
|
||||
}
|
||||
assertEquals("incorrect number of elements", numExpectedElements, info.length());
|
||||
|
||||
QueueInfo qi = isParentQueue ? new QueueInfo() : new LeafQueueInfo();
|
||||
qi.capacity = (float) info.getDouble("capacity");
|
||||
qi.usedCapacity = (float) info.getDouble("usedCapacity");
|
||||
qi.maxCapacity = (float) info.getDouble("maxCapacity");
|
||||
qi.absoluteCapacity = (float) info.getDouble("absoluteCapacity");
|
||||
qi.absoluteMaxCapacity = (float) info.getDouble("absoluteMaxCapacity");
|
||||
qi.absoluteUsedCapacity = (float) info.getDouble("absoluteUsedCapacity");
|
||||
qi.numApplications = info.getInt("numApplications");
|
||||
qi.maxParallelApps = info.getInt("maxParallelApps");
|
||||
qi.queueName = info.getString("queueName");
|
||||
qi.queuePath = info.getString("queuePath");
|
||||
qi.state = info.getString("state");
|
||||
|
||||
verifySubQueueGeneric(q, qi, parentAbsCapacity, parentAbsMaxCapacity);
|
||||
|
||||
// Separate Condition for Managed Parent Queue
|
||||
if (qi.queueName.equals("a1c")) {
|
||||
assertTrue(info.getBoolean("autoCreateChildQueueEnabled"));
|
||||
} else if (isParentQueue) {
|
||||
JSONArray arr = info.getJSONObject("queues").getJSONArray("queue");
|
||||
// test subqueues
|
||||
for (int i = 0; i < arr.length(); i++) {
|
||||
JSONObject obj = arr.getJSONObject(i);
|
||||
String q2 = q + "." + obj.getString("queueName");
|
||||
verifySubQueue(obj, q2, qi.absoluteCapacity, qi.absoluteMaxCapacity);
|
||||
}
|
||||
|
||||
JSONObject maximumAllocation = info.getJSONObject("maximumAllocation");
|
||||
assertEquals("8192", maximumAllocation.getString("memory"));
|
||||
assertEquals("4", maximumAllocation.getString("vCores"));
|
||||
|
||||
JSONObject queueAcls = info.getJSONObject("queueAcls");
|
||||
assertEquals(1, queueAcls.length());
|
||||
|
||||
assertEquals("0", info.getString("queuePriority"));
|
||||
assertEquals("utilization", info.getString("orderingPolicyInfo"));
|
||||
assertFalse(info.getBoolean("autoCreateChildQueueEnabled"));
|
||||
} else {
|
||||
Assert.assertEquals("\"type\" field is incorrect",
|
||||
"capacitySchedulerLeafQueueInfo", info.getString("type"));
|
||||
LeafQueueInfo lqi = (LeafQueueInfo) qi;
|
||||
lqi.numActiveApplications = info.getInt("numActiveApplications");
|
||||
lqi.numPendingApplications = info.getInt("numPendingApplications");
|
||||
lqi.numContainers = info.getInt("numContainers");
|
||||
lqi.maxApplications = info.getInt("maxApplications");
|
||||
lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
|
||||
lqi.userLimit = (float) info.getDouble("userLimit");
|
||||
lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
|
||||
lqi.defaultApplicationLifetime =
|
||||
info.getLong("defaultApplicationLifetime");
|
||||
lqi.maxApplicationLifetime = info.getLong("maxApplicationLifetime");
|
||||
verifyLeafQueueGeneric(q, lqi);
|
||||
// resourcesUsed and users (per-user resources used) are checked in
|
||||
// testPerUserResource()
|
||||
}
|
||||
}
|
||||
|
||||
private void verifySubQueueGeneric(String q, QueueInfo info,
|
||||
float parentAbsCapacity, float parentAbsMaxCapacity) throws Exception {
|
||||
String[] qArr = q.split("\\.");
|
||||
assertTrue("q name invalid: " + q, qArr.length > 1);
|
||||
String qshortName = qArr[qArr.length - 1];
|
||||
|
||||
assertEquals("usedCapacity doesn't match", 0, info.usedCapacity, 1e-3f);
|
||||
assertEquals("capacity doesn't match", csConf.getNonLabeledQueueCapacity(q),
|
||||
info.capacity, 1e-3f);
|
||||
float expectCapacity = csConf.getNonLabeledQueueMaximumCapacity(q);
|
||||
float expectAbsMaxCapacity = parentAbsMaxCapacity * (info.maxCapacity/100);
|
||||
if (CapacitySchedulerConfiguration.UNDEFINED == expectCapacity) {
|
||||
expectCapacity = 100;
|
||||
expectAbsMaxCapacity = 100;
|
||||
}
|
||||
assertEquals("maxCapacity doesn't match", expectCapacity,
|
||||
info.maxCapacity, 1e-3f);
|
||||
assertEquals("absoluteCapacity doesn't match",
|
||||
parentAbsCapacity * (info.capacity/100), info.absoluteCapacity, 1e-3f);
|
||||
assertEquals("absoluteMaxCapacity doesn't match",
|
||||
expectAbsMaxCapacity, info.absoluteMaxCapacity, 1e-3f);
|
||||
assertEquals("absoluteUsedCapacity doesn't match",
|
||||
0, info.absoluteUsedCapacity, 1e-3f);
|
||||
assertEquals("numApplications doesn't match", 0, info.numApplications);
|
||||
assertTrue("queueName doesn't match, got: " + info.queueName
|
||||
+ " expected: " + q, qshortName.matches(info.queueName));
|
||||
assertTrue("queuePath doesn't match, got: " + info.getQueuePath()
|
||||
+ " expected: " + q, q.matches(info.getQueuePath()));
|
||||
assertTrue("state doesn't match",
|
||||
(csConf.getState(q).toString()).matches(info.state));
|
||||
if (q.equals("c")) {
|
||||
assertTrue("c queue is not configured in Absolute resource",
|
||||
info.isAbsoluteResource);
|
||||
} else {
|
||||
assertFalse(info.queueName
|
||||
+ " queue is not configured in Absolute resource",
|
||||
info.isAbsoluteResource);
|
||||
}
|
||||
assertEquals("maxParallelApps doesn't match " + q,
|
||||
(q.equals("root.a") ? 42 : Integer.MAX_VALUE),
|
||||
info.maxParallelApps);
|
||||
}
|
||||
|
||||
private void verifyLeafQueueGeneric(String q, LeafQueueInfo info)
|
||||
throws Exception {
|
||||
assertEquals("numActiveApplications doesn't match",
|
||||
0, info.numActiveApplications);
|
||||
assertEquals("numPendingApplications doesn't match",
|
||||
0, info.numPendingApplications);
|
||||
assertEquals("numContainers doesn't match",
|
||||
0, info.numContainers);
|
||||
|
||||
int maxSystemApps = csConf.getMaximumSystemApplications();
|
||||
int expectedMaxApps = (int)(maxSystemApps * (info.absoluteCapacity/100));
|
||||
int expectedMaxAppsPerUser = Math.min(expectedMaxApps,
|
||||
(int)(expectedMaxApps * (info.userLimit/100.0f) *
|
||||
info.userLimitFactor));
|
||||
|
||||
// TODO: would like to use integer comparisons here but can't due to
|
||||
// roundoff errors in absolute capacity calculations
|
||||
assertEquals("maxApplications doesn't match",
|
||||
(float)expectedMaxApps, (float)info.maxApplications, 1.0f);
|
||||
assertEquals("maxApplicationsPerUser doesn't match",
|
||||
(float)expectedMaxAppsPerUser,
|
||||
(float)info.maxApplicationsPerUser, info.userLimitFactor);
|
||||
|
||||
assertEquals("userLimit doesn't match", csConf.getUserLimit(q),
|
||||
info.userLimit, 1e-3f);
|
||||
assertEquals("userLimitFactor doesn't match",
|
||||
csConf.getUserLimitFactor(q), info.userLimitFactor, 1e-3f);
|
||||
|
||||
if (q.equals("root.a.a2")) {
|
||||
assertEquals("defaultApplicationLifetime doesn't match",
|
||||
csConf.getDefaultLifetimePerQueue(q),
|
||||
info.defaultApplicationLifetime);
|
||||
assertEquals("maxApplicationLifetime doesn't match",
|
||||
csConf.getMaximumLifetimePerQueue(q),
|
||||
info.maxApplicationLifetime);
|
||||
}
|
||||
}
|
||||
|
||||
//Return a child Node of node with the tagname or null if none exists
|
||||
private Node getChildNodeByName(Node node, String tagname) {
|
||||
NodeList nodeList = node.getChildNodes();
|
||||
for (int i=0; i < nodeList.getLength(); ++i) {
|
||||
if (nodeList.item(i).getNodeName().equals(tagname)) {
|
||||
return nodeList.item(i);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test per user resources and resourcesUsed elements in the web services XML
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testPerUserResourcesXML() throws Exception {
|
||||
//Start RM so that it accepts app submissions
|
||||
// Start RM so that it accepts app submissions
|
||||
rm.start();
|
||||
try {
|
||||
MockRMAppSubmissionData data1 =
|
||||
|
@ -618,55 +226,9 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
MockRMAppSubmitter.submit(rm, data);
|
||||
|
||||
//Get the XML from ws/v1/cluster/scheduler
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws/v1/cluster/scheduler")
|
||||
.accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_XML_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
String xml = response.getEntity(String.class);
|
||||
DocumentBuilder db = DocumentBuilderFactory.newInstance()
|
||||
.newDocumentBuilder();
|
||||
InputSource is = new InputSource();
|
||||
is.setCharacterStream(new StringReader(xml));
|
||||
//Parse the XML we got
|
||||
Document dom = db.parse(is);
|
||||
|
||||
//Get all users elements (1 for each leaf queue)
|
||||
NodeList allUsers = dom.getElementsByTagName("users");
|
||||
for (int i=0; i<allUsers.getLength(); ++i) {
|
||||
Node perUserResources = allUsers.item(i);
|
||||
String queueName = getChildNodeByName(perUserResources
|
||||
.getParentNode(), "queueName").getTextContent();
|
||||
if (queueName.equals("b1")) {
|
||||
//b1 should have two users (user1 and user2) which submitted jobs
|
||||
assertEquals(2, perUserResources.getChildNodes().getLength());
|
||||
NodeList users = perUserResources.getChildNodes();
|
||||
for (int j=0; j<users.getLength(); ++j) {
|
||||
Node user = users.item(j);
|
||||
String username = getChildNodeByName(user, "username")
|
||||
.getTextContent();
|
||||
assertTrue(username.equals("user1") || username.equals("user2"));
|
||||
//Should be a parsable integer
|
||||
Integer.parseInt(getChildNodeByName(getChildNodeByName(user,
|
||||
"resourcesUsed"), "memory").getTextContent());
|
||||
Integer.parseInt(getChildNodeByName(user, "numActiveApplications")
|
||||
.getTextContent());
|
||||
Integer.parseInt(getChildNodeByName(user, "numPendingApplications")
|
||||
.getTextContent());
|
||||
}
|
||||
} else {
|
||||
//Queues other than b1 should have 0 users
|
||||
assertEquals(0, perUserResources.getChildNodes().getLength());
|
||||
}
|
||||
}
|
||||
NodeList allResourcesUsed = dom.getElementsByTagName("resourcesUsed");
|
||||
for (int i=0; i<allResourcesUsed.getLength(); ++i) {
|
||||
Node resourcesUsed = allResourcesUsed.item(i);
|
||||
Integer.parseInt(getChildNodeByName(resourcesUsed, "memory")
|
||||
.getTextContent());
|
||||
Integer.parseInt(getChildNodeByName(resourcesUsed, "vCores")
|
||||
.getTextContent());
|
||||
}
|
||||
ClientResponse response = resource().path("ws/v1/cluster/scheduler")
|
||||
.accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
|
||||
assertXmlResponse(response, "webapp/scheduler-response-PerUserResources.xml");
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
|
@ -685,63 +247,14 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
rm.start();
|
||||
try {
|
||||
//Get the XML from ws/v1/cluster/scheduler
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws/v1/cluster/scheduler")
|
||||
ClientResponse response = resource().path("ws/v1/cluster/scheduler")
|
||||
.accept(MediaType.APPLICATION_XML).get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_XML_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
String xml = response.getEntity(String.class);
|
||||
DocumentBuilder db = DocumentBuilderFactory.newInstance()
|
||||
.newDocumentBuilder();
|
||||
InputSource is = new InputSource();
|
||||
is.setCharacterStream(new StringReader(xml));
|
||||
//Parse the XML we got
|
||||
Document dom = db.parse(is);
|
||||
|
||||
NodeList allQueues = dom.getElementsByTagName("queue");
|
||||
for (int i = 0; i < allQueues.getLength(); ++i) {
|
||||
Node queueNode = allQueues.item(i);
|
||||
Node queuePathNode = getChildNodeByName(queueNode, "queuePath");
|
||||
if (queuePathNode == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
String queuePath = queuePathNode.getTextContent();
|
||||
if (queuePath != null) {
|
||||
if (queuePath.startsWith("root.a")) {
|
||||
assertEquals("root-a-default-label",
|
||||
getChildNodeByName(queueNode, "defaultNodeLabelExpression")
|
||||
.getTextContent());
|
||||
} else {
|
||||
assertEquals("ROOT-INHERITED",
|
||||
getChildNodeByName(queueNode, "defaultNodeLabelExpression")
|
||||
.getTextContent());
|
||||
}
|
||||
}
|
||||
}
|
||||
assertXmlResponse(response, "webapp/scheduler-response-NodeLabelDefaultAPI.xml");
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
private void checkResourcesUsed(JSONObject queue) throws JSONException {
|
||||
queue.getJSONObject("resourcesUsed").getInt("memory");
|
||||
queue.getJSONObject("resourcesUsed").getInt("vCores");
|
||||
}
|
||||
|
||||
//Also checks resourcesUsed
|
||||
private JSONObject getSubQueue(JSONObject queue, String subQueue)
|
||||
throws JSONException {
|
||||
JSONArray queues = queue.getJSONObject("queues").getJSONArray("queue");
|
||||
for (int i=0; i<queues.length(); ++i) {
|
||||
checkResourcesUsed(queues.getJSONObject(i));
|
||||
if (queues.getJSONObject(i).getString("queueName").equals(subQueue) ) {
|
||||
return queues.getJSONObject(i);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPerUserResourcesJSON() throws Exception {
|
||||
//Start RM so that it accepts app submissions
|
||||
|
@ -767,48 +280,113 @@ public class TestRMWebServicesCapacitySched extends JerseyTestBase {
|
|||
MockRMAppSubmitter.submit(rm, data);
|
||||
|
||||
//Get JSON
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
ClientResponse response = resource().path("ws").path("v1").path("cluster")
|
||||
.path("scheduler/").accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
|
||||
JSONObject schedulerInfo = json.getJSONObject("scheduler").getJSONObject(
|
||||
"schedulerInfo");
|
||||
JSONObject b1 = getSubQueue(getSubQueue(schedulerInfo, "b"), "b1");
|
||||
//Check users user1 and user2 exist in b1
|
||||
JSONArray users = b1.getJSONObject("users").getJSONArray("user");
|
||||
for (int i=0; i<2; ++i) {
|
||||
JSONObject user = users.getJSONObject(i);
|
||||
assertTrue("User isn't user1 or user2",user.getString("username")
|
||||
.equals("user1") || user.getString("username").equals("user2"));
|
||||
user.getInt("numActiveApplications");
|
||||
user.getInt("numPendingApplications");
|
||||
checkResourcesUsed(user);
|
||||
}
|
||||
|
||||
// Verify 'queues' field is omitted from CapacitySchedulerLeafQueueInfo.
|
||||
try {
|
||||
b1.getJSONObject("queues");
|
||||
fail("CapacitySchedulerQueueInfo should omit field 'queues'" +
|
||||
"if child queue is empty.");
|
||||
} catch (JSONException je) {
|
||||
assertEquals("JSONObject[\"queues\"] not found.", je.getMessage());
|
||||
}
|
||||
assertJsonResponse(response, "webapp/scheduler-response-PerUserResources.json");
|
||||
} finally {
|
||||
rm.stop();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testResourceInfo() {
|
||||
Resource res = Resources.createResource(10, 1);
|
||||
// If we add a new resource (e.g disks), then
|
||||
// If we add a new resource (e.g. disks), then
|
||||
// CapacitySchedulerPage and these RM WebServices + docs need to be updated
|
||||
// eg. ResourceInfo
|
||||
// e.g. ResourceInfo
|
||||
assertEquals("<memory:10, vCores:1>", res.toString());
|
||||
}
|
||||
|
||||
public static void assertXmlType(ClientResponse response) {
|
||||
assertEquals(MediaType.APPLICATION_XML_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
}
|
||||
|
||||
public static void assertXmlResponse(ClientResponse response,
|
||||
String expectedResourceFilename) throws
|
||||
Exception {
|
||||
assertXmlType(response);
|
||||
Document document = loadDocument(response.getEntity(String.class));
|
||||
String actual = serializeDocument(document).trim();
|
||||
updateTestDataAutomatically(expectedResourceFilename, actual);
|
||||
assertEquals(getResourceAsString(expectedResourceFilename), actual);
|
||||
}
|
||||
|
||||
public static String serializeDocument(Document document) throws TransformerException {
|
||||
DOMSource domSource = new DOMSource(document);
|
||||
StringWriter writer = new StringWriter();
|
||||
StreamResult result = new StreamResult(writer);
|
||||
TransformerFactory tf = TransformerFactory.newInstance();
|
||||
Transformer transformer = tf.newTransformer();
|
||||
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
|
||||
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
|
||||
transformer.transform(domSource, result);
|
||||
return writer.toString();
|
||||
}
|
||||
|
||||
public static Document loadDocument(String xml) throws Exception {
|
||||
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
|
||||
DocumentBuilder builder = factory.newDocumentBuilder();
|
||||
InputSource is = new InputSource(new StringReader(xml));
|
||||
return builder.parse(is);
|
||||
}
|
||||
|
||||
public static void assertJsonResponse(ClientResponse response,
|
||||
String expectedResourceFilename) throws
|
||||
JSONException, IOException {
|
||||
assertJsonType(response);
|
||||
JSONObject json = response.getEntity(JSONObject.class);
|
||||
String actual = json.toString(2);
|
||||
updateTestDataAutomatically(expectedResourceFilename, actual);
|
||||
assertEquals(getResourceAsString(expectedResourceFilename), actual);
|
||||
}
|
||||
|
||||
public static void assertJsonType(ClientResponse response) {
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
}
|
||||
|
||||
public static InputStream getResourceAsStream(String configFilename) {
|
||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||
return classLoader.getResourceAsStream(configFilename);
|
||||
}
|
||||
|
||||
public static String getResourceAsString(String configFilename) throws IOException {
|
||||
try (InputStream is = getResourceAsStream(configFilename)) {
|
||||
if (is == null) {
|
||||
return null;
|
||||
}
|
||||
try (InputStreamReader isr = new InputStreamReader(is);
|
||||
BufferedReader reader = new BufferedReader(isr)) {
|
||||
return reader.lines().collect(Collectors.joining(System.lineSeparator()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void updateTestDataAutomatically(String configFilename, String actualContent) {
|
||||
/*
|
||||
Set UPDATE_TESTDATA=1 environment variable for auto update the expected data
|
||||
or uncomment this return statement.
|
||||
|
||||
It's safe in a way that, this updates the source directory so the test will still fail,
|
||||
because the target directory is untouched.
|
||||
*/
|
||||
if (System.getenv("UPDATE_TESTDATA") == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||
try {
|
||||
String resource = Objects.requireNonNull(
|
||||
Objects.requireNonNull(classLoader.getResource(configFilename)).toURI().getPath())
|
||||
.replaceAll("/target/test-classes/", "/src/test/resources/");
|
||||
try (FileWriter writer = new FileWriter(resource, false)) {
|
||||
writer.write(actualContent);
|
||||
}
|
||||
} catch (URISyntaxException | IOException e) {
|
||||
e.printStackTrace();
|
||||
Assert.fail("overwrite should not fail " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,26 +18,21 @@
|
|||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Comparator;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.http.JettyUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
|
@ -45,88 +40,25 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCrea
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||
import org.codehaus.jettison.json.JSONArray;
|
||||
import org.codehaus.jettison.json.JSONException;
|
||||
import org.codehaus.jettison.json.JSONObject;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.servlet.ServletModule;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.WebResource;
|
||||
import com.sun.jersey.guice.spi.container.servlet.GuiceContainer;
|
||||
import com.sun.jersey.test.framework.WebAppDescriptor;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.webapp.TestRMWebServicesCapacitySched.assertJsonResponse;
|
||||
|
||||
public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
||||
JerseyTestBase {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestRMWebServicesCapacitySchedDynamicConfig.class);
|
||||
private static final float EXP_WEIGHT_NON_WEIGHT_MODE = -1.0F;
|
||||
private static final float EXP_NORM_WEIGHT_NON_WEIGHT_MODE = 0.0F;
|
||||
private static final float EXP_ROOT_WEIGHT_IN_WEIGHT_MODE = 1.0F;
|
||||
private static final float EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE = 1.0F;
|
||||
private static final double DELTA = 0.00001;
|
||||
private static final String PARENT_QUEUE = "parent";
|
||||
private static final String LEAF_QUEUE = "leaf";
|
||||
private static final String STATIC_QUEUE = "static";
|
||||
private static final String FLEXIBLE_DYNAMIC_QUEUE = "dynamicFlexible";
|
||||
private static final String AUTO_CREATION_OFF = "off";
|
||||
private static final String AUTO_CREATION_LEGACY = "legacy";
|
||||
private static final String AUTO_CREATION_FLEXIBLE = "flexible";
|
||||
private static final int GB = 1024;
|
||||
protected static MockRM RM;
|
||||
private static MockRM rm;
|
||||
|
||||
private CapacitySchedulerQueueManager autoQueueHandler;
|
||||
private CapacitySchedulerConfiguration csConf;
|
||||
|
||||
private static class ExpectedQueueWithProperties {
|
||||
private String path;
|
||||
public final float weight;
|
||||
public final float normalizedWeight;
|
||||
private String queueType;
|
||||
private String creationMethod;
|
||||
private String autoCreationEligibility;
|
||||
private List<String[]> autoQueueTemplateProperties;
|
||||
|
||||
public ExpectedQueueWithProperties(String path, float weight,
|
||||
float normalizedWeight, String queueType, String creationMethod,
|
||||
String autoCreationEligibility) {
|
||||
this.path = path;
|
||||
this.weight = weight;
|
||||
this.normalizedWeight = normalizedWeight;
|
||||
this.queueType = queueType;
|
||||
this.creationMethod = creationMethod;
|
||||
this.autoCreationEligibility = autoCreationEligibility;
|
||||
this.autoQueueTemplateProperties = new ArrayList<>();
|
||||
}
|
||||
|
||||
ExpectedQueueWithProperties(
|
||||
String path, float weight, float normalizedWeight, String queueType,
|
||||
String creationMethod, String autoCreationEligibility,
|
||||
List<String[]> autoQueueTemplateProperties) {
|
||||
this.path = path;
|
||||
this.weight = weight;
|
||||
this.normalizedWeight = normalizedWeight;
|
||||
this.queueType = queueType;
|
||||
this.creationMethod = creationMethod;
|
||||
this.autoCreationEligibility = autoCreationEligibility;
|
||||
this.autoQueueTemplateProperties = autoQueueTemplateProperties;
|
||||
}
|
||||
}
|
||||
|
||||
private static class WebServletModule extends ServletModule {
|
||||
private final Configuration conf;
|
||||
|
||||
public WebServletModule(Configuration conf) {
|
||||
WebServletModule(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
|
@ -139,30 +71,22 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
|||
ResourceScheduler.class);
|
||||
conf.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
|
||||
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
||||
RM = new MockRM(conf);
|
||||
bind(ResourceManager.class).toInstance(RM);
|
||||
rm = new MockRM(conf);
|
||||
bind(ResourceManager.class).toInstance(rm);
|
||||
serve("/*").with(GuiceContainer.class);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
private void initResourceManager(Configuration conf) throws IOException {
|
||||
GuiceServletConfig.setInjector(
|
||||
Guice.createInjector(new WebServletModule(conf)));
|
||||
RM.start();
|
||||
rm.start();
|
||||
//Need to call reinitialize as
|
||||
//MutableCSConfigurationProvider with InMemoryConfigurationStore
|
||||
//somehow does not load the queues properly and falls back to default config.
|
||||
//Therefore CS will think there's only the default queue there.
|
||||
((CapacityScheduler) RM.getResourceScheduler()).reinitialize(conf,
|
||||
RM.getRMContext(), true);
|
||||
CapacityScheduler cs = (CapacityScheduler) RM.getResourceScheduler();
|
||||
csConf = cs.getConfiguration();
|
||||
((CapacityScheduler) rm.getResourceScheduler()).reinitialize(conf,
|
||||
rm.getRMContext(), true);
|
||||
}
|
||||
|
||||
public TestRMWebServicesCapacitySchedDynamicConfig() {
|
||||
|
@ -182,20 +106,14 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
|||
YarnConfiguration.MEMORY_CONFIGURATION_STORE);
|
||||
|
||||
initResourceManager(config);
|
||||
JSONObject json = sendRequestToSchedulerEndpoint();
|
||||
validateSchedulerInfo(json, "percentage",
|
||||
new ExpectedQueueWithProperties("root",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
PARENT_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.default",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test1",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test2",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF));
|
||||
|
||||
/*
|
||||
* mode: percentage
|
||||
* autoCreationEligibility: off
|
||||
* weight: -1, normalizedWeight: 0
|
||||
* root.queueType: parent, others.queueType: leaf
|
||||
*/
|
||||
assertJsonResponse(sendRequest(), "webapp/scheduler-response-PercentageMode.json");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -207,20 +125,15 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
|||
YarnConfiguration.MEMORY_CONFIGURATION_STORE);
|
||||
|
||||
initResourceManager(config);
|
||||
JSONObject json = sendRequestToSchedulerEndpoint();
|
||||
validateSchedulerInfo(json, "percentage",
|
||||
new ExpectedQueueWithProperties("root",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
PARENT_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.default",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test1",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.managedtest2",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
PARENT_QUEUE, STATIC_QUEUE, AUTO_CREATION_LEGACY));
|
||||
|
||||
/*
|
||||
* mode: percentage
|
||||
* managedtest2.autoCreationEligibility: legacy, others.autoCreationEligibility: off
|
||||
* weight: -1, normalizedWeight: 0
|
||||
* root.queueType: parent, others.queueType: leaf
|
||||
*/
|
||||
assertJsonResponse(sendRequest(),
|
||||
"webapp/scheduler-response-PercentageModeLegacyAutoCreation.json");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -232,20 +145,14 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
|||
YarnConfiguration.MEMORY_CONFIGURATION_STORE);
|
||||
|
||||
initResourceManager(config);
|
||||
JSONObject json = sendRequestToSchedulerEndpoint();
|
||||
validateSchedulerInfo(json, "absolute",
|
||||
new ExpectedQueueWithProperties("root",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
PARENT_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.default",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test1",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test2",
|
||||
EXP_WEIGHT_NON_WEIGHT_MODE, EXP_NORM_WEIGHT_NON_WEIGHT_MODE,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF));
|
||||
|
||||
/*
|
||||
* mode: absolute
|
||||
* autoCreationEligibility: off
|
||||
* weight: -1, normalizedWeight: 0
|
||||
* root.queueType: parent, others.queueType: leaf
|
||||
*/
|
||||
assertJsonResponse(sendRequest(), "webapp/scheduler-response-AbsoluteMode.json");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -257,17 +164,16 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
|||
YarnConfiguration.MEMORY_CONFIGURATION_STORE);
|
||||
|
||||
initResourceManager(config);
|
||||
JSONObject json = sendRequestToSchedulerEndpoint();
|
||||
validateSchedulerInfo(json, "weight",
|
||||
new ExpectedQueueWithProperties("root",
|
||||
EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE,
|
||||
PARENT_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.default", 10.0f, 0.5f,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test1", 4.0f, 0.2f,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test2", 6.0f, 0.3f,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF));
|
||||
|
||||
/*
|
||||
* mode: weight
|
||||
* autoCreationEligibility: off
|
||||
* root default test1 test2
|
||||
* weight: 1 10 4 6
|
||||
* normalizedWeight: 1 0.5 0.2 0.3
|
||||
* root.queueType: parent, others.queueType: leaf
|
||||
*/
|
||||
assertJsonResponse(sendRequest(), "webapp/scheduler-response-WeightMode.json");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -284,17 +190,10 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
|||
|
||||
initResourceManager(config);
|
||||
initAutoQueueHandler();
|
||||
JSONObject json = sendRequestToSchedulerEndpoint();
|
||||
validateSchedulerInfo(json, "weight",
|
||||
new ExpectedQueueWithProperties("root",
|
||||
EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE,
|
||||
PARENT_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.default", 10.0f, 0.5f,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test1", 4.0f, 0.2f,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test2", 6.0f, 0.3f,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF));
|
||||
|
||||
// same as webapp/scheduler-response-WeightMode.json, but with effective resources filled in
|
||||
assertJsonResponse(sendRequest(),
|
||||
"webapp/scheduler-response-WeightModeWithAutoCreatedQueues-Before.json");
|
||||
|
||||
//Now create some auto created queues
|
||||
createQueue("root.auto1");
|
||||
|
@ -302,242 +201,32 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
|||
createQueue("root.auto3");
|
||||
createQueue("root.autoParent1.auto4");
|
||||
|
||||
json = sendRequestToSchedulerEndpoint();
|
||||
//root.auto1=1w, root.auto2=1w, root.auto3=1w
|
||||
//root.default=10w, root.test1=4w, root.test2=6w
|
||||
//root.autoparent1=1w
|
||||
int sumOfWeights = 24;
|
||||
ExpectedQueueWithProperties expectedRootQ =
|
||||
new ExpectedQueueWithProperties("root",
|
||||
EXP_ROOT_WEIGHT_IN_WEIGHT_MODE, EXP_ROOT_WEIGHT_IN_WEIGHT_MODE,
|
||||
PARENT_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF);
|
||||
List<String[]> templateProperties = new ArrayList<>();
|
||||
templateProperties.add(new String[] {"maximum-applications", "300"});
|
||||
|
||||
validateSchedulerInfo(json, "weight",
|
||||
expectedRootQ,
|
||||
new ExpectedQueueWithProperties("root.auto1",
|
||||
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
|
||||
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights,
|
||||
LEAF_QUEUE, FLEXIBLE_DYNAMIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.auto2",
|
||||
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
|
||||
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights,
|
||||
LEAF_QUEUE, FLEXIBLE_DYNAMIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.auto3",
|
||||
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
|
||||
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights,
|
||||
LEAF_QUEUE, FLEXIBLE_DYNAMIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.autoParent1",
|
||||
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
|
||||
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE / sumOfWeights,
|
||||
PARENT_QUEUE, FLEXIBLE_DYNAMIC_QUEUE, AUTO_CREATION_FLEXIBLE,
|
||||
templateProperties),
|
||||
new ExpectedQueueWithProperties("root.default", 10.0f,
|
||||
10.0f / sumOfWeights,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test1", 4.0f,
|
||||
4.0f / sumOfWeights,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF),
|
||||
new ExpectedQueueWithProperties("root.test2", 6.0f,
|
||||
6.0f / sumOfWeights,
|
||||
LEAF_QUEUE, STATIC_QUEUE, AUTO_CREATION_OFF));
|
||||
|
||||
validateChildrenOfParent(json, "root.autoParent1", "weight",
|
||||
expectedRootQ,
|
||||
new ExpectedQueueWithProperties("root.autoParent1.auto4",
|
||||
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
|
||||
EXP_DEFAULT_WEIGHT_IN_WEIGHT_MODE,
|
||||
LEAF_QUEUE, FLEXIBLE_DYNAMIC_QUEUE, AUTO_CREATION_OFF));
|
||||
/*
|
||||
* root default test1 test2 autoParent1 auto1 auto2 auto3 auto4
|
||||
* weight: 1 10 4 6 1 1 1 1 1
|
||||
* normalizedWeight: 1 0.41 0.16 0.25 1 0.04 0.04 0.04 0.04
|
||||
* autoCreationEligibility: flexible off off off flexible off off off off
|
||||
* queueType: parent leaf leaf leaf parent leaf leaf leaf leaf
|
||||
*/
|
||||
assertJsonResponse(sendRequest(),
|
||||
"webapp/scheduler-response-WeightModeWithAutoCreatedQueues-After.json");
|
||||
}
|
||||
|
||||
private void initAutoQueueHandler() throws Exception {
|
||||
CapacityScheduler cs = (CapacityScheduler) RM.getResourceScheduler();
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
autoQueueHandler = cs.getCapacitySchedulerQueueManager();
|
||||
MockNM nm1 = RM.registerNode("h1:1234", 1200 * GB); // label = x
|
||||
rm.registerNode("h1:1234", 1200 * GB); // label = x
|
||||
}
|
||||
|
||||
private LeafQueue createQueue(String queuePath) throws YarnException,
|
||||
private void createQueue(String queuePath) throws YarnException,
|
||||
IOException {
|
||||
return autoQueueHandler.createQueue(new QueuePath(queuePath));
|
||||
autoQueueHandler.createQueue(new QueuePath(queuePath));
|
||||
}
|
||||
|
||||
private JSONObject sendRequestToSchedulerEndpoint() throws Exception {
|
||||
WebResource r = resource();
|
||||
ClientResponse response = r.path("ws").path("v1").path("cluster")
|
||||
private ClientResponse sendRequest() {
|
||||
return resource().path("ws").path("v1").path("cluster")
|
||||
.path("scheduler").accept(MediaType.APPLICATION_JSON)
|
||||
.get(ClientResponse.class);
|
||||
assertEquals(MediaType.APPLICATION_JSON_TYPE + "; " + JettyUtils.UTF_8,
|
||||
response.getType().toString());
|
||||
String jsonString = response.getEntity(String.class);
|
||||
LOG.debug("Received JSON response: " + jsonString);
|
||||
return new JSONObject(jsonString);
|
||||
}
|
||||
|
||||
private void validateSchedulerInfo(JSONObject json, String expectedMode,
|
||||
ExpectedQueueWithProperties rootQueue,
|
||||
ExpectedQueueWithProperties... expectedQueues) throws JSONException {
|
||||
Assert.assertNotNull("SchedulerTypeInfo should not be null", json);
|
||||
assertEquals("incorrect number of elements in: " + json, 1, json.length());
|
||||
|
||||
JSONObject info = verifySchedulerJSONObject(json);
|
||||
info = verifySchedulerInfoJSONObject(expectedMode, rootQueue, info);
|
||||
JSONArray queueArray = verifyQueueJSONListObject(info,
|
||||
expectedQueues.length);
|
||||
verifyQueues(CapacitySchedulerConfiguration.ROOT, expectedMode,
|
||||
queueArray, expectedQueues);
|
||||
}
|
||||
|
||||
private void validateChildrenOfParent(JSONObject json,
|
||||
String parentPath, String expectedMode,
|
||||
ExpectedQueueWithProperties rootQueue,
|
||||
ExpectedQueueWithProperties... expectedLeafQueues) throws JSONException {
|
||||
Assert.assertNotNull("SchedulerTypeInfo should not be null", json);
|
||||
assertEquals("incorrect number of elements in: " + json, 1, json.length());
|
||||
|
||||
JSONObject info = verifySchedulerJSONObject(json);
|
||||
info = verifySchedulerInfoJSONObject(expectedMode, rootQueue, info);
|
||||
JSONArray queueArray = getQueuesJSONListObject(info);
|
||||
|
||||
Set<String> verifiedQueues = new HashSet<>();
|
||||
for (int i = 0; i < queueArray.length(); i++) {
|
||||
JSONObject childQueueObj = queueArray.getJSONObject(i);
|
||||
String queuePath = CapacitySchedulerConfiguration.ROOT + "." +
|
||||
childQueueObj.getString("queueName");
|
||||
if (queuePath.equals(parentPath)) {
|
||||
JSONArray childQueueArray = verifyQueueJSONListObject(childQueueObj,
|
||||
expectedLeafQueues.length);
|
||||
verifyQueues(parentPath, expectedMode, childQueueArray,
|
||||
expectedLeafQueues);
|
||||
verifiedQueues.add(queuePath);
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals("Not all child queues were found. " +
|
||||
String.format("Found queues: %s, All queues: %s", verifiedQueues,
|
||||
Arrays.stream(expectedLeafQueues).map(lq -> lq.path)
|
||||
.collect(Collectors.toList())),
|
||||
expectedLeafQueues.length, verifiedQueues.size());
|
||||
}
|
||||
|
||||
private JSONObject verifySchedulerJSONObject(JSONObject json)
|
||||
throws JSONException {
|
||||
JSONObject info = json.getJSONObject("scheduler");
|
||||
Assert.assertNotNull("Scheduler object should not be null", json);
|
||||
assertEquals("incorrect number of elements in: " + info, 1, info.length());
|
||||
return info;
|
||||
}
|
||||
|
||||
private JSONObject verifySchedulerInfoJSONObject(String expectedMode,
|
||||
ExpectedQueueWithProperties rootQueue, JSONObject info)
|
||||
throws JSONException {
|
||||
//Validate if root queue has the expected mode and weight values
|
||||
info = info.getJSONObject("schedulerInfo");
|
||||
Assert.assertNotNull("SchedulerInfo should not be null", info);
|
||||
Assert.assertEquals("Expected Queue mode " + expectedMode, expectedMode,
|
||||
info.getString("mode"));
|
||||
Assert.assertEquals(rootQueue.weight,
|
||||
Float.parseFloat(info.getString("weight")), DELTA);
|
||||
Assert.assertEquals(rootQueue.normalizedWeight,
|
||||
Float.parseFloat(info.getString("normalizedWeight")), DELTA);
|
||||
return info;
|
||||
}
|
||||
|
||||
private JSONArray verifyQueueJSONListObject(JSONObject info,
|
||||
int expectedQSize) throws JSONException {
|
||||
JSONArray queueArray = getQueuesJSONListObject(info);
|
||||
assertEquals("QueueInfoList should be size of " + expectedQSize,
|
||||
expectedQSize, queueArray.length());
|
||||
return queueArray;
|
||||
}
|
||||
|
||||
private JSONArray getQueuesJSONListObject(JSONObject info)
|
||||
throws JSONException {
|
||||
JSONObject queuesObj = info.getJSONObject("queues");
|
||||
Assert.assertNotNull("QueueInfoList should not be null", queuesObj);
|
||||
|
||||
JSONArray queueArray = queuesObj.getJSONArray("queue");
|
||||
Assert.assertNotNull("Queue list should not be null", queueArray);
|
||||
return queueArray;
|
||||
}
|
||||
|
||||
private void verifyQueues(String parentPath, String expectedMode,
|
||||
JSONArray queueArray, ExpectedQueueWithProperties[] expectedQueues)
|
||||
throws JSONException {
|
||||
Map<String, ExpectedQueueWithProperties> queuesMap = new HashMap<>();
|
||||
for (ExpectedQueueWithProperties expectedQueue : expectedQueues) {
|
||||
queuesMap.put(expectedQueue.path, expectedQueue);
|
||||
}
|
||||
|
||||
// Create mapping of queue path -> mode
|
||||
Map<String, String> modesMap = new HashMap<>();
|
||||
for (int i = 0; i < queueArray.length(); i++) {
|
||||
JSONObject obj = queueArray.getJSONObject(i);
|
||||
String queuePath = parentPath + "." + obj.getString("queueName");
|
||||
String mode = obj.getString("mode");
|
||||
modesMap.put(queuePath, mode);
|
||||
|
||||
//validate weights of all other queues
|
||||
ExpectedQueueWithProperties expectedQueue = queuesMap.get(queuePath);
|
||||
Assert.assertNotNull("Queue not found in expectedQueueMap with path: " +
|
||||
queuePath, expectedQueue);
|
||||
Assert.assertEquals("Weight value does not match",
|
||||
expectedQueue.weight, Float.parseFloat(obj.getString("weight")),
|
||||
DELTA);
|
||||
Assert.assertEquals("Normalized weight value does not match for queue " +
|
||||
queuePath,
|
||||
expectedQueue.normalizedWeight,
|
||||
Float.parseFloat(obj.getString("normalizedWeight")), DELTA);
|
||||
|
||||
//validate queue creation type
|
||||
Assert.assertEquals("Queue type does not match for queue " +
|
||||
queuePath,
|
||||
expectedQueue.queueType, obj.getString("queueType"));
|
||||
|
||||
Assert.assertEquals("Queue creation type does not match for queue " +
|
||||
queuePath,
|
||||
expectedQueue.creationMethod, obj.getString("creationMethod"));
|
||||
|
||||
if (!expectedQueue.autoQueueTemplateProperties.isEmpty()) {
|
||||
JSONArray templates = obj.getJSONObject("autoQueueTemplateProperties")
|
||||
.getJSONArray("property");
|
||||
for (int j = 0; j < templates.length(); j++) {
|
||||
JSONObject prop = templates.getJSONObject(j);
|
||||
Assert.assertEquals("Auto creation eligible queue " +
|
||||
"template key do not match for queue" + queuePath,
|
||||
expectedQueue.autoQueueTemplateProperties.get(j)[0],
|
||||
prop.getString("name"));
|
||||
Assert.assertEquals("Auto creation eligible queue " +
|
||||
"template value do not match for queue" + queuePath,
|
||||
expectedQueue.autoQueueTemplateProperties.get(j)[1],
|
||||
prop.getString("value"));
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals("Queue auto creation eligibility does not " +
|
||||
"match for queue " + queuePath,
|
||||
expectedQueue.autoCreationEligibility,
|
||||
obj.getString("autoCreationEligibility"));
|
||||
}
|
||||
|
||||
//Validate queue paths and modes
|
||||
List<String> sortedExpectedPaths = Arrays.stream(expectedQueues)
|
||||
.map(eq -> eq.path)
|
||||
.sorted(Comparator.comparing(String::toLowerCase))
|
||||
.collect(Collectors.toList());
|
||||
|
||||
List<String> sortedActualPaths = modesMap.keySet().stream()
|
||||
.sorted(Comparator.comparing(String::toLowerCase))
|
||||
.collect(Collectors.toList());
|
||||
Assert.assertEquals("Expected Queue paths: " + sortedExpectedPaths,
|
||||
sortedExpectedPaths, sortedActualPaths);
|
||||
|
||||
// Validate if we have a single "mode" for all queues
|
||||
Set<String> modesSet = new HashSet<>(modesMap.values());
|
||||
Assert.assertEquals("Expected a single Queue mode for all queues: " +
|
||||
expectedMode + ", got: " + modesMap, 1, modesSet.size());
|
||||
Assert.assertEquals("Expected Queue mode " + expectedMode,
|
||||
expectedMode, modesSet.iterator().next());
|
||||
}
|
||||
|
||||
private static class CSConfigGenerator {
|
||||
|
@ -583,7 +272,7 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
|||
}
|
||||
|
||||
public static Configuration createWeightConfig() {
|
||||
return createWeightConfigInternal(false);
|
||||
return createWeightConfigInternal(false);
|
||||
}
|
||||
|
||||
public static Configuration createWeightConfigWithAutoQueueCreationEnabled() {
|
||||
|
@ -611,7 +300,7 @@ public class TestRMWebServicesCapacitySchedDynamicConfig extends
|
|||
Map<String, String> configs) {
|
||||
Configuration config = new Configuration();
|
||||
|
||||
for (Map.Entry<String, String> entry: configs.entrySet()) {
|
||||
for (Map.Entry<String, String> entry : configs.entrySet()) {
|
||||
config.set(entry.getKey(), entry.getValue());
|
||||
}
|
||||
return config;
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue