YARN-1258. Allow configuring the Fair Scheduler root queue (Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1530543 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
dc0ccacb53
commit
a909a261e6
|
@ -44,6 +44,8 @@ Release 2.3.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-976. Document the meaning of a virtual core. (Sandy Ryza)
|
YARN-976. Document the meaning of a virtual core. (Sandy Ryza)
|
||||||
|
|
||||||
|
YARN-1258. Allow configuring the Fair Scheduler root queue (Sandy Ryza)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -302,56 +302,71 @@ public class QueueManager {
|
||||||
throw new AllocationConfigurationException("Bad fair scheduler config " +
|
throw new AllocationConfigurationException("Bad fair scheduler config " +
|
||||||
"file: top-level element not <allocations>");
|
"file: top-level element not <allocations>");
|
||||||
NodeList elements = root.getChildNodes();
|
NodeList elements = root.getChildNodes();
|
||||||
|
List<Element> queueElements = new ArrayList<Element>();
|
||||||
for (int i = 0; i < elements.getLength(); i++) {
|
for (int i = 0; i < elements.getLength(); i++) {
|
||||||
Node node = elements.item(i);
|
Node node = elements.item(i);
|
||||||
if (!(node instanceof Element))
|
if (node instanceof Element) {
|
||||||
continue;
|
Element element = (Element)node;
|
||||||
Element element = (Element)node;
|
if ("queue".equals(element.getTagName()) ||
|
||||||
if ("queue".equals(element.getTagName()) ||
|
"pool".equals(element.getTagName())) {
|
||||||
"pool".equals(element.getTagName())) {
|
queueElements.add(element);
|
||||||
loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
|
} else if ("user".equals(element.getTagName())) {
|
||||||
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
|
String userName = element.getAttribute("name");
|
||||||
queueAcls, queueNamesInAllocFile);
|
NodeList fields = element.getChildNodes();
|
||||||
} else if ("user".equals(element.getTagName())) {
|
for (int j = 0; j < fields.getLength(); j++) {
|
||||||
String userName = element.getAttribute("name");
|
Node fieldNode = fields.item(j);
|
||||||
NodeList fields = element.getChildNodes();
|
if (!(fieldNode instanceof Element))
|
||||||
for (int j = 0; j < fields.getLength(); j++) {
|
continue;
|
||||||
Node fieldNode = fields.item(j);
|
Element field = (Element) fieldNode;
|
||||||
if (!(fieldNode instanceof Element))
|
if ("maxRunningApps".equals(field.getTagName())) {
|
||||||
continue;
|
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||||
Element field = (Element) fieldNode;
|
int val = Integer.parseInt(text);
|
||||||
if ("maxRunningApps".equals(field.getTagName())) {
|
userMaxApps.put(userName, val);
|
||||||
String text = ((Text)field.getFirstChild()).getData().trim();
|
}
|
||||||
int val = Integer.parseInt(text);
|
|
||||||
userMaxApps.put(userName, val);
|
|
||||||
}
|
}
|
||||||
|
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
|
||||||
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
||||||
|
int val = Integer.parseInt(text);
|
||||||
|
userMaxAppsDefault = val;
|
||||||
|
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
|
||||||
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
||||||
|
long val = Long.parseLong(text) * 1000L;
|
||||||
|
fairSharePreemptionTimeout = val;
|
||||||
|
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
|
||||||
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
||||||
|
long val = Long.parseLong(text) * 1000L;
|
||||||
|
defaultMinSharePreemptionTimeout = val;
|
||||||
|
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
|
||||||
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
||||||
|
int val = Integer.parseInt(text);
|
||||||
|
queueMaxAppsDefault = val;
|
||||||
|
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|
||||||
|
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
|
||||||
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
||||||
|
SchedulingPolicy.setDefault(text);
|
||||||
|
defaultSchedPolicy = SchedulingPolicy.getDefault();
|
||||||
|
} else {
|
||||||
|
LOG.warn("Bad element in allocations file: " + element.getTagName());
|
||||||
}
|
}
|
||||||
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
|
|
||||||
String text = ((Text)element.getFirstChild()).getData().trim();
|
|
||||||
int val = Integer.parseInt(text);
|
|
||||||
userMaxAppsDefault = val;
|
|
||||||
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
|
|
||||||
String text = ((Text)element.getFirstChild()).getData().trim();
|
|
||||||
long val = Long.parseLong(text) * 1000L;
|
|
||||||
fairSharePreemptionTimeout = val;
|
|
||||||
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
|
|
||||||
String text = ((Text)element.getFirstChild()).getData().trim();
|
|
||||||
long val = Long.parseLong(text) * 1000L;
|
|
||||||
defaultMinSharePreemptionTimeout = val;
|
|
||||||
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
|
|
||||||
String text = ((Text)element.getFirstChild()).getData().trim();
|
|
||||||
int val = Integer.parseInt(text);
|
|
||||||
queueMaxAppsDefault = val;
|
|
||||||
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|
|
||||||
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
|
|
||||||
String text = ((Text)element.getFirstChild()).getData().trim();
|
|
||||||
SchedulingPolicy.setDefault(text);
|
|
||||||
defaultSchedPolicy = SchedulingPolicy.getDefault();
|
|
||||||
} else {
|
|
||||||
LOG.warn("Bad element in allocations file: " + element.getTagName());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Load queue elements. A root queue can either be included or omitted. If
|
||||||
|
// it's included, all other queues must be inside it.
|
||||||
|
for (Element element : queueElements) {
|
||||||
|
String parent = "root";
|
||||||
|
if (element.getAttribute("name").equalsIgnoreCase("root")) {
|
||||||
|
if (queueElements.size() > 1) {
|
||||||
|
throw new AllocationConfigurationException("If configuring root queue,"
|
||||||
|
+ " no other queues can be placed alongside it.");
|
||||||
|
}
|
||||||
|
parent = null;
|
||||||
|
}
|
||||||
|
loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
|
||||||
|
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
|
||||||
|
queueAcls, queueNamesInAllocFile);
|
||||||
|
}
|
||||||
|
|
||||||
// Commit the reload; also create any queue defined in the alloc file
|
// Commit the reload; also create any queue defined in the alloc file
|
||||||
// if it does not already exist, so it can be displayed on the web UI.
|
// if it does not already exist, so it can be displayed on the web UI.
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
|
@ -398,7 +413,10 @@ public class QueueManager {
|
||||||
Map<String, Long> minSharePreemptionTimeouts,
|
Map<String, Long> minSharePreemptionTimeouts,
|
||||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
|
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
|
||||||
throws AllocationConfigurationException {
|
throws AllocationConfigurationException {
|
||||||
String queueName = parentName + "." + element.getAttribute("name");
|
String queueName = element.getAttribute("name");
|
||||||
|
if (parentName != null) {
|
||||||
|
queueName = parentName + "." + queueName;
|
||||||
|
}
|
||||||
Map<QueueACL, AccessControlList> acls =
|
Map<QueueACL, AccessControlList> acls =
|
||||||
new HashMap<QueueACL, AccessControlList>();
|
new HashMap<QueueACL, AccessControlList>();
|
||||||
NodeList fields = element.getChildNodes();
|
NodeList fields = element.getChildNodes();
|
||||||
|
|
|
@ -923,14 +923,70 @@ public class TestFairScheduler {
|
||||||
|
|
||||||
Collection<FSLeafQueue> leafQueues = queueManager.getLeafQueues();
|
Collection<FSLeafQueue> leafQueues = queueManager.getLeafQueues();
|
||||||
Assert.assertEquals(4, leafQueues.size());
|
Assert.assertEquals(4, leafQueues.size());
|
||||||
Assert.assertNotNull(queueManager.getLeafQueue("queueA", true));
|
Assert.assertNotNull(queueManager.getLeafQueue("queueA", false));
|
||||||
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC", true));
|
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC", false));
|
||||||
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD", true));
|
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD", false));
|
||||||
Assert.assertNotNull(queueManager.getLeafQueue("default", true));
|
Assert.assertNotNull(queueManager.getLeafQueue("default", false));
|
||||||
// Make sure querying for queues didn't create any new ones:
|
// Make sure querying for queues didn't create any new ones:
|
||||||
Assert.assertEquals(4, leafQueues.size());
|
Assert.assertEquals(4, leafQueues.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConfigureRootQueue() throws Exception {
|
||||||
|
Configuration conf = createConfiguration();
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
|
||||||
|
out.println(" <queue name=\"child1\">");
|
||||||
|
out.println(" <minResources>1024mb,1vcores</minResources>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println(" <queue name=\"child2\">");
|
||||||
|
out.println(" <minResources>1024mb,4vcores</minResources>");
|
||||||
|
out.println(" </queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
QueueManager queueManager = scheduler.getQueueManager();
|
||||||
|
queueManager.initialize();
|
||||||
|
|
||||||
|
FSQueue root = queueManager.getRootQueue();
|
||||||
|
assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
|
||||||
|
|
||||||
|
assertNotNull(queueManager.getLeafQueue("child1", false));
|
||||||
|
assertNotNull(queueManager.getLeafQueue("child2", false));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that you can't place queues at the same level as the root queue in
|
||||||
|
* the allocations file.
|
||||||
|
*/
|
||||||
|
@Test (expected = AllocationConfigurationException.class)
|
||||||
|
public void testQueueAlongsideRoot() throws Exception {
|
||||||
|
Configuration conf = createConfiguration();
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"other\">");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
QueueManager queueManager = scheduler.getQueueManager();
|
||||||
|
queueManager.initialize();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
|
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
|
||||||
Configuration conf = createConfiguration();
|
Configuration conf = createConfiguration();
|
||||||
|
|
Loading…
Reference in New Issue