YARN-1258. Allow configuring the Fair Scheduler root queue (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1530542 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-10-09 09:19:14 +00:00
parent 389d846e77
commit 643b1a4019
3 changed files with 124 additions and 48 deletions

View File

@ -59,6 +59,8 @@ Release 2.3.0 - UNRELEASED
YARN-976. Document the meaning of a virtual core. (Sandy Ryza) YARN-976. Document the meaning of a virtual core. (Sandy Ryza)
YARN-1258. Allow configuring the Fair Scheduler root queue (Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -302,56 +302,71 @@ public class QueueManager {
throw new AllocationConfigurationException("Bad fair scheduler config " + throw new AllocationConfigurationException("Bad fair scheduler config " +
"file: top-level element not <allocations>"); "file: top-level element not <allocations>");
NodeList elements = root.getChildNodes(); NodeList elements = root.getChildNodes();
List<Element> queueElements = new ArrayList<Element>();
for (int i = 0; i < elements.getLength(); i++) { for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i); Node node = elements.item(i);
if (!(node instanceof Element)) if (node instanceof Element) {
continue; Element element = (Element)node;
Element element = (Element)node; if ("queue".equals(element.getTagName()) ||
if ("queue".equals(element.getTagName()) || "pool".equals(element.getTagName())) {
"pool".equals(element.getTagName())) { queueElements.add(element);
loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps, } else if ("user".equals(element.getTagName())) {
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, String userName = element.getAttribute("name");
queueAcls, queueNamesInAllocFile); NodeList fields = element.getChildNodes();
} else if ("user".equals(element.getTagName())) { for (int j = 0; j < fields.getLength(); j++) {
String userName = element.getAttribute("name"); Node fieldNode = fields.item(j);
NodeList fields = element.getChildNodes(); if (!(fieldNode instanceof Element))
for (int j = 0; j < fields.getLength(); j++) { continue;
Node fieldNode = fields.item(j); Element field = (Element) fieldNode;
if (!(fieldNode instanceof Element)) if ("maxRunningApps".equals(field.getTagName())) {
continue; String text = ((Text)field.getFirstChild()).getData().trim();
Element field = (Element) fieldNode; int val = Integer.parseInt(text);
if ("maxRunningApps".equals(field.getTagName())) { userMaxApps.put(userName, val);
String text = ((Text)field.getFirstChild()).getData().trim(); }
int val = Integer.parseInt(text);
userMaxApps.put(userName, val);
} }
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxAppsDefault = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeout = val;
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultMinSharePreemptionTimeout = val;
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxAppsDefault = val;
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
SchedulingPolicy.setDefault(text);
defaultSchedPolicy = SchedulingPolicy.getDefault();
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
} }
} else if ("userMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxAppsDefault = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeout = val;
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultMinSharePreemptionTimeout = val;
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxAppsDefault = val;
} else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|| "defaultQueueSchedulingMode".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
SchedulingPolicy.setDefault(text);
defaultSchedPolicy = SchedulingPolicy.getDefault();
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
} }
} }
// Load queue elements. A root queue can either be included or omitted. If
// it's included, all other queues must be inside it.
for (Element element : queueElements) {
String parent = "root";
if (element.getAttribute("name").equalsIgnoreCase("root")) {
if (queueElements.size() > 1) {
throw new AllocationConfigurationException("If configuring root queue,"
+ " no other queues can be placed alongside it.");
}
parent = null;
}
loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
queueAcls, queueNamesInAllocFile);
}
// Commit the reload; also create any queue defined in the alloc file // Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI. // if it does not already exist, so it can be displayed on the web UI.
synchronized (this) { synchronized (this) {
@ -398,7 +413,10 @@ public class QueueManager {
Map<String, Long> minSharePreemptionTimeouts, Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile) Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
throws AllocationConfigurationException { throws AllocationConfigurationException {
String queueName = parentName + "." + element.getAttribute("name"); String queueName = element.getAttribute("name");
if (parentName != null) {
queueName = parentName + "." + queueName;
}
Map<QueueACL, AccessControlList> acls = Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>(); new HashMap<QueueACL, AccessControlList>();
NodeList fields = element.getChildNodes(); NodeList fields = element.getChildNodes();

View File

@ -923,14 +923,70 @@ public class TestFairScheduler {
Collection<FSLeafQueue> leafQueues = queueManager.getLeafQueues(); Collection<FSLeafQueue> leafQueues = queueManager.getLeafQueues();
Assert.assertEquals(4, leafQueues.size()); Assert.assertEquals(4, leafQueues.size());
Assert.assertNotNull(queueManager.getLeafQueue("queueA", true)); Assert.assertNotNull(queueManager.getLeafQueue("queueA", false));
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC", true)); Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC", false));
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD", true)); Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD", false));
Assert.assertNotNull(queueManager.getLeafQueue("default", true)); Assert.assertNotNull(queueManager.getLeafQueue("default", false));
// Make sure querying for queues didn't create any new ones: // Make sure querying for queues didn't create any new ones:
Assert.assertEquals(4, leafQueues.size()); Assert.assertEquals(4, leafQueues.size());
} }
@Test
public void testConfigureRootQueue() throws Exception {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
out.println("<queue name=\"root\">");
out.println(" <schedulingPolicy>drf</schedulingPolicy>");
out.println(" <queue name=\"child1\">");
out.println(" <minResources>1024mb,1vcores</minResources>");
out.println(" </queue>");
out.println(" <queue name=\"child2\">");
out.println(" <minResources>1024mb,4vcores</minResources>");
out.println(" </queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
FSQueue root = queueManager.getRootQueue();
assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy);
assertNotNull(queueManager.getLeafQueue("child1", false));
assertNotNull(queueManager.getLeafQueue("child2", false));
}
/**
* Verify that you can't place queues at the same level as the root queue in
* the allocations file.
*/
@Test (expected = AllocationConfigurationException.class)
public void testQueueAlongsideRoot() throws Exception {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"root\">");
out.println("</queue>");
out.println("<queue name=\"other\">");
out.println("</queue>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
}
@Test @Test
public void testBackwardsCompatibleAllocationFileParsing() throws Exception { public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
Configuration conf = createConfiguration(); Configuration conf = createConfiguration();