From a909a261e62fc4768069eb1a18c293f9f7b6f901 Mon Sep 17 00:00:00 2001 From: Sanford Ryza Date: Wed, 9 Oct 2013 09:21:23 +0000 Subject: [PATCH] YARN-1258. Allow configuring the Fair Scheduler root queue (Sandy Ryza) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1530543 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 2 + .../scheduler/fair/QueueManager.java | 106 ++++++++++-------- .../scheduler/fair/TestFairScheduler.java | 64 ++++++++++- 3 files changed, 124 insertions(+), 48 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 06fdfb2c8c2..d1ea4f2c684 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -44,6 +44,8 @@ Release 2.3.0 - UNRELEASED YARN-976. Document the meaning of a virtual core. (Sandy Ryza) + YARN-1258. Allow configuring the Fair Scheduler root queue (Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index b891381cd81..8f2fc1e9a1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -302,55 +302,70 @@ public void reloadAllocs() throws IOException, ParserConfigurationException, throw new AllocationConfigurationException("Bad fair scheduler config " + "file: top-level element not "); NodeList elements = root.getChildNodes(); + List queueElements = new ArrayList(); for (int i = 0; i < elements.getLength(); i++) { Node node = elements.item(i); - if (!(node instanceof Element)) - continue; - Element element = (Element)node; - if ("queue".equals(element.getTagName()) || - "pool".equals(element.getTagName())) { - loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps, - userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, - queueAcls, queueNamesInAllocFile); - } else if ("user".equals(element.getTagName())) { - String userName = element.getAttribute("name"); - NodeList fields = element.getChildNodes(); - for (int j = 0; j < fields.getLength(); j++) { - Node fieldNode = fields.item(j); - if (!(fieldNode instanceof Element)) - continue; - Element field = (Element) fieldNode; - if ("maxRunningApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - userMaxApps.put(userName, val); + if (node instanceof Element) { + Element element = (Element)node; + if ("queue".equals(element.getTagName()) || + "pool".equals(element.getTagName())) { + queueElements.add(element); + } else if ("user".equals(element.getTagName())) { + String userName = element.getAttribute("name"); + NodeList fields = element.getChildNodes(); + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) + continue; + Element field = (Element) fieldNode; + if ("maxRunningApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + userMaxApps.put(userName, val); + } } + } else if ("userMaxAppsDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + userMaxAppsDefault = val; + } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + fairSharePreemptionTimeout = val; + } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + defaultMinSharePreemptionTimeout = val; + } else if ("queueMaxAppsDefault".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + queueMaxAppsDefault = val; + } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) + || "defaultQueueSchedulingMode".equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + SchedulingPolicy.setDefault(text); + defaultSchedPolicy = SchedulingPolicy.getDefault(); + } else { + LOG.warn("Bad element in allocations file: " + element.getTagName()); } - } else if ("userMaxAppsDefault".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - userMaxAppsDefault = val; - } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - fairSharePreemptionTimeout = val; - } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - defaultMinSharePreemptionTimeout = val; - } else if ("queueMaxAppsDefault".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - queueMaxAppsDefault = val; - } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) - || "defaultQueueSchedulingMode".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - SchedulingPolicy.setDefault(text); - defaultSchedPolicy = SchedulingPolicy.getDefault(); - } else { - LOG.warn("Bad element in allocations file: " + element.getTagName()); } } + + // Load queue elements. A root queue can either be included or omitted. If + // it's included, all other queues must be inside it. + for (Element element : queueElements) { + String parent = "root"; + if (element.getAttribute("name").equalsIgnoreCase("root")) { + if (queueElements.size() > 1) { + throw new AllocationConfigurationException("If configuring root queue," + + " no other queues can be placed alongside it."); + } + parent = null; + } + loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, + userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, + queueAcls, queueNamesInAllocFile); + } // Commit the reload; also create any queue defined in the alloc file // if it does not already exist, so it can be displayed on the web UI. @@ -398,7 +413,10 @@ private void loadQueue(String parentName, Element element, Map Map minSharePreemptionTimeouts, Map> queueAcls, List queueNamesInAllocFile) throws AllocationConfigurationException { - String queueName = parentName + "." + element.getAttribute("name"); + String queueName = element.getAttribute("name"); + if (parentName != null) { + queueName = parentName + "." + queueName; + } Map acls = new HashMap(); NodeList fields = element.getChildNodes(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 22c0e232af7..609b3f7817d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -923,14 +923,70 @@ public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAX Collection leafQueues = queueManager.getLeafQueues(); Assert.assertEquals(4, leafQueues.size()); - Assert.assertNotNull(queueManager.getLeafQueue("queueA", true)); - Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC", true)); - Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD", true)); - Assert.assertNotNull(queueManager.getLeafQueue("default", true)); + Assert.assertNotNull(queueManager.getLeafQueue("queueA", false)); + Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC", false)); + Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD", false)); + Assert.assertNotNull(queueManager.getLeafQueue("default", false)); // Make sure querying for queues didn't create any new ones: Assert.assertEquals(4, leafQueues.size()); } + @Test + public void testConfigureRootQueue() throws Exception { + Configuration conf = createConfiguration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println("fair"); + out.println(""); + out.println(" drf"); + out.println(" "); + out.println(" 1024mb,1vcores"); + out.println(" "); + out.println(" "); + out.println(" 1024mb,4vcores"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + QueueManager queueManager = scheduler.getQueueManager(); + queueManager.initialize(); + + FSQueue root = queueManager.getRootQueue(); + assertTrue(root.getPolicy() instanceof DominantResourceFairnessPolicy); + + assertNotNull(queueManager.getLeafQueue("child1", false)); + assertNotNull(queueManager.getLeafQueue("child2", false)); + } + + /** + * Verify that you can't place queues at the same level as the root queue in + * the allocations file. + */ + @Test (expected = AllocationConfigurationException.class) + public void testQueueAlongsideRoot() throws Exception { + Configuration conf = createConfiguration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + QueueManager queueManager = scheduler.getQueueManager(); + queueManager.initialize(); + } + @Test public void testBackwardsCompatibleAllocationFileParsing() throws Exception { Configuration conf = createConfiguration();