diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8182d8cea41..a7fa9439b0e 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -8,6 +8,9 @@ Release 2.5.0 - UNRELEASED YARN-1757. NM Recovery. Auxiliary service support. (Jason Lowe via kasha) + YARN-1864. Fair Scheduler Dynamic Hierarchical User Queues (Ashwin Shankar + via Sandy Ryza) + IMPROVEMENTS YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 6fc90f472c1..0f9d9069204 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -77,19 +77,22 @@ public class AllocationConfiguration { @VisibleForTesting QueuePlacementPolicy placementPolicy; + //Configured queues in the alloc xml @VisibleForTesting - Set queueNames; + Map> configuredQueues; - public AllocationConfiguration(Map minQueueResources, - Map maxQueueResources, + public AllocationConfiguration(Map minQueueResources, + Map maxQueueResources, Map queueMaxApps, Map userMaxApps, Map queueWeights, int userMaxAppsDefault, - int queueMaxAppsDefault, Map schedulingPolicies, + int queueMaxAppsDefault, + Map schedulingPolicies, SchedulingPolicy defaultSchedulingPolicy, - Map minSharePreemptionTimeouts, + Map minSharePreemptionTimeouts, Map> queueAcls, long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout, - QueuePlacementPolicy placementPolicy, Set queueNames) { + QueuePlacementPolicy placementPolicy, + Map> configuredQueues) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; @@ -104,7 +107,7 @@ public AllocationConfiguration(Map minQueueResources, this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; this.placementPolicy = placementPolicy; - this.queueNames = queueNames; + this.configuredQueues = configuredQueues; } public AllocationConfiguration(Configuration conf) { @@ -121,9 +124,12 @@ public AllocationConfiguration(Configuration conf) { fairSharePreemptionTimeout = Long.MAX_VALUE; schedulingPolicies = new HashMap(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; + configuredQueues = new HashMap>(); + for (FSQueueType queueType : FSQueueType.values()) { + configuredQueues.put(queueType, new HashSet()); + } placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, - new HashSet()); - queueNames = new HashSet(); + configuredQueues); } /** @@ -221,8 +227,8 @@ public SchedulingPolicy getDefaultSchedulingPolicy() { return defaultSchedulingPolicy; } - public Set getQueueNames() { - return queueNames; + public Map> getConfiguredQueues() { + return configuredQueues; } public QueuePlacementPolicy getPlacementPolicy() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 69dcf89b628..bedbb64cd2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -214,8 +214,15 @@ public synchronized void reloadAllocations() throws IOException, QueuePlacementPolicy newPlacementPolicy = null; // Remember all queue names so we can display them on web UI, etc. - Set queueNamesInAllocFile = new HashSet(); - + // configuredQueues is segregated based on whether it is a leaf queue + // or a parent queue. This information is used for creating queues + // and also for making queue placement decisions(QueuePlacementRule.java). + Map> configuredQueues = + new HashMap>(); + for (FSQueueType queueType : FSQueueType.values()) { + configuredQueues.put(queueType, new HashSet()); + } + // Read and parse the allocations file. DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance(); @@ -289,26 +296,27 @@ public synchronized void reloadAllocations() throws IOException, } parent = null; } - loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, - userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, - queueAcls, queueNamesInAllocFile); + loadQueue(parent, element, minQueueResources, maxQueueResources, + queueMaxApps, userMaxApps, queueWeights, queuePolicies, + minSharePreemptionTimeouts, queueAcls, + configuredQueues); } // Load placement policy and pass it configured queues Configuration conf = getConfig(); if (placementPolicyElement != null) { newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement, - queueNamesInAllocFile, conf); + configuredQueues, conf); } else { newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf, - queueNamesInAllocFile); + configuredQueues); } AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, - newPlacementPolicy, queueNamesInAllocFile); + newPlacementPolicy, configuredQueues); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -324,7 +332,8 @@ private void loadQueue(String parentName, Element element, Map Map userMaxApps, Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, - Map> queueAcls, Set queueNamesInAllocFile) + Map> queueAcls, + Map> configuredQueues) throws AllocationConfigurationException { String queueName = element.getAttribute("name"); if (parentName != null) { @@ -375,13 +384,19 @@ private void loadQueue(String parentName, Element element, Map "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, queuePolicies, - minSharePreemptionTimeouts, - queueAcls, queueNamesInAllocFile); + minSharePreemptionTimeouts, queueAcls, configuredQueues); + configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } } if (isLeaf) { - queueNamesInAllocFile.add(queueName); + // if a leaf in the alloc file is marked as type='parent' + // then store it under 'parent' + if ("parent".equals(element.getAttribute("type"))) { + configuredQueues.get(FSQueueType.PARENT).add(queueName); + } else { + configuredQueues.get(FSQueueType.LEAF).add(queueName); + } } queueAcls.put(queueName, acls); if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java new file mode 100644 index 00000000000..c201735678f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +public enum FSQueueType { + /* + * Represents a leaf queue + */ + LEAF, + + /* + * Represents a parent queue + */ + PARENT +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index f6f51602557..44918f33f82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -74,7 +74,7 @@ public void initialize(Configuration conf) throws IOException, } /** - * Get a queue by name, creating it if the create param is true and is necessary. + * Get a leaf queue by name, creating it if the create param is true and is necessary. * If the queue is not or can not be a leaf queue, i.e. it already exists as a * parent queue, or one of the parents in its name is already a leaf queue, * null is returned. @@ -85,31 +85,53 @@ public void initialize(Configuration conf) throws IOException, * could be referred to as just "parent1.queue2". */ public FSLeafQueue getLeafQueue(String name, boolean create) { + FSQueue queue = getQueue(name, create, FSQueueType.LEAF); + if (queue instanceof FSParentQueue) { + return null; + } + return (FSLeafQueue) queue; + } + + /** + * Get a parent queue by name, creating it if the create param is true and is necessary. + * If the queue is not or can not be a parent queue, i.e. it already exists as a + * leaf queue, or one of the parents in its name is already a leaf queue, + * null is returned. + * + * The root part of the name is optional, so a queue underneath the root + * named "queue1" could be referred to as just "queue1", and a queue named + * "queue2" underneath a parent named "parent1" that is underneath the root + * could be referred to as just "parent1.queue2". + */ + public FSParentQueue getParentQueue(String name, boolean create) { + FSQueue queue = getQueue(name, create, FSQueueType.PARENT); + if (queue instanceof FSLeafQueue) { + return null; + } + return (FSParentQueue) queue; + } + + private FSQueue getQueue(String name, boolean create, FSQueueType queueType) { name = ensureRootPrefix(name); synchronized (queues) { FSQueue queue = queues.get(name); if (queue == null && create) { - FSLeafQueue leafQueue = createLeafQueue(name); - if (leafQueue == null) { - return null; - } - queue = leafQueue; - } else if (queue instanceof FSParentQueue) { - return null; + // if the queue doesn't exist,create it and return + queue = createQueue(name, queueType); } - return (FSLeafQueue)queue; + return queue; } } /** - * Creates a leaf queue and places it in the tree. Creates any - * parents that don't already exist. + * Creates a leaf or parent queue based on what is specified in 'queueType' + * and places it in the tree. Creates any parents that don't already exist. * * @return * the created queue, if successful. null if not allowed (one of the parent * queues in the queue name is already a leaf queue) */ - private FSLeafQueue createLeafQueue(String name) { + private FSQueue createQueue(String name, FSQueueType queueType) { List newQueueNames = new ArrayList(); newQueueNames.add(name); int sepIndex = name.length(); @@ -143,8 +165,7 @@ private FSLeafQueue createLeafQueue(String name) { FSLeafQueue leafQueue = null; for (int i = newQueueNames.size()-1; i >= 0; i--) { String queueName = newQueueNames.get(i); - if (i == 0) { - // First name added was the leaf queue + if (i == 0 && queueType != FSQueueType.PARENT) { leafQueue = new FSLeafQueue(name, scheduler, parent); try { leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy()); @@ -155,6 +176,7 @@ private FSLeafQueue createLeafQueue(String name) { parent.addChildQueue(leafQueue); queues.put(leafQueue.getName(), leafQueue); leafQueues.add(leafQueue); + return leafQueue; } else { FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); try { @@ -169,53 +191,64 @@ private FSLeafQueue createLeafQueue(String name) { } } - return leafQueue; + return parent; } /** - * Make way for the given leaf queue if possible, by removing incompatible + * Make way for the given queue if possible, by removing incompatible * queues with no apps in them. Incompatibility could be due to - * (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in - * the ancestry of leafToCreate. + * (1) queueToCreate being currently a parent but needs to change to leaf + * (2) queueToCreate being currently a leaf but needs to change to parent + * (3) an existing leaf queue in the ancestry of queueToCreate. * * We will never remove the root queue or the default queue in this way. * - * @return true if we can create leafToCreate or it already exists. + * @return true if we can create queueToCreate or it already exists. */ - private boolean removeEmptyIncompatibleQueues(String leafToCreate) { - leafToCreate = ensureRootPrefix(leafToCreate); + private boolean removeEmptyIncompatibleQueues(String queueToCreate, + FSQueueType queueType) { + queueToCreate = ensureRootPrefix(queueToCreate); - // Ensure leafToCreate is not root and doesn't have the default queue in its + // Ensure queueToCreate is not root and doesn't have the default queue in its // ancestry. - if (leafToCreate.equals(ROOT_QUEUE) || - leafToCreate.startsWith( + if (queueToCreate.equals(ROOT_QUEUE) || + queueToCreate.startsWith( ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) { return false; } - FSQueue queue = queues.get(leafToCreate); + FSQueue queue = queues.get(queueToCreate); // Queue exists already. if (queue != null) { if (queue instanceof FSLeafQueue) { - // If it's an already existing leaf, we're ok. - return true; + if (queueType == FSQueueType.LEAF) { + // if queue is already a leaf then return true + return true; + } + // remove incompatibility since queue is a leaf currently + // needs to change to a parent. + return removeQueueIfEmpty(queue); } else { - // If it's an existing parent queue, remove it if it's empty. + if (queueType == FSQueueType.PARENT) { + return true; + } + // If it's an existing parent queue and needs to change to leaf, + // remove it if it's empty. return removeQueueIfEmpty(queue); } } // Queue doesn't exist already. Check if the new queue would be created // under an existing leaf queue. If so, try removing that leaf queue. - int sepIndex = leafToCreate.length(); - sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1); + int sepIndex = queueToCreate.length(); + sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1); while (sepIndex != -1) { - String prefixString = leafToCreate.substring(0, sepIndex); + String prefixString = queueToCreate.substring(0, sepIndex); FSQueue prefixQueue = queues.get(prefixString); if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) { return removeQueueIfEmpty(prefixQueue); } - sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1); + sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1); } return true; } @@ -312,12 +345,21 @@ private String ensureRootPrefix(String name) { } public void updateAllocationConfiguration(AllocationConfiguration queueConf) { - // Make sure all queues exist - for (String name : queueConf.getQueueNames()) { - if (removeEmptyIncompatibleQueues(name)) { + // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist + for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) { + if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) { getLeafQueue(name, true); } } + + // At this point all leaves and 'parents with at least one child' would have been created. + // Now create parents with no configured leaf. + for (String name : queueConf.getConfiguredQueues().get( + FSQueueType.PARENT)) { + if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) { + getParentQueue(name, true); + } + } for (FSQueue queue : queues.values()) { // Update queue metrics diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java index 1fd18e7c971..3ab6036d965 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java @@ -42,17 +42,19 @@ public class QueuePlacementPolicy { map.put("secondaryGroupExistingQueue", QueuePlacementRule.SecondaryGroupExistingQueue.class); map.put("specified", QueuePlacementRule.Specified.class); + map.put("nestedUserQueue", + QueuePlacementRule.NestedUserQueue.class); map.put("default", QueuePlacementRule.Default.class); map.put("reject", QueuePlacementRule.Reject.class); ruleClasses = Collections.unmodifiableMap(map); } private final List rules; - private final Set configuredQueues; + private final Map> configuredQueues; private final Groups groups; public QueuePlacementPolicy(List rules, - Set configuredQueues, Configuration conf) + Map> configuredQueues, Configuration conf) throws AllocationConfigurationException { for (int i = 0; i < rules.size()-1; i++) { if (rules.get(i).isTerminal()) { @@ -72,40 +74,53 @@ public QueuePlacementPolicy(List rules, /** * Builds a QueuePlacementPolicy from an xml element. */ - public static QueuePlacementPolicy fromXml(Element el, Set configuredQueues, - Configuration conf) throws AllocationConfigurationException { + public static QueuePlacementPolicy fromXml(Element el, + Map> configuredQueues, Configuration conf) + throws AllocationConfigurationException { List rules = new ArrayList(); NodeList elements = el.getChildNodes(); for (int i = 0; i < elements.getLength(); i++) { Node node = elements.item(i); if (node instanceof Element) { - Element element = (Element)node; - - String ruleName = element.getAttribute("name"); - if ("".equals(ruleName)) { - throw new AllocationConfigurationException("No name provided for a " + - "rule element"); - } - - Class clazz = ruleClasses.get(ruleName); - if (clazz == null) { - throw new AllocationConfigurationException("No rule class found for " - + ruleName); - } - QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null); - rule.initializeFromXml(element); + QueuePlacementRule rule = createAndInitializeRule(node); rules.add(rule); } } return new QueuePlacementPolicy(rules, configuredQueues, conf); } + /** + * Create and initialize a rule given a xml node + * @param node + * @return QueuePlacementPolicy + * @throws AllocationConfigurationException + */ + public static QueuePlacementRule createAndInitializeRule(Node node) + throws AllocationConfigurationException { + Element element = (Element) node; + + String ruleName = element.getAttribute("name"); + if ("".equals(ruleName)) { + throw new AllocationConfigurationException("No name provided for a " + + "rule element"); + } + + Class clazz = ruleClasses.get(ruleName); + if (clazz == null) { + throw new AllocationConfigurationException("No rule class found for " + + ruleName); + } + QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null); + rule.initializeFromXml(element); + return rule; + } + /** * Build a simple queue placement policy from the allow-undeclared-pools and * user-as-default-queue configuration options. */ public static QueuePlacementPolicy fromConfiguration(Configuration conf, - Set configuredQueues) { + Map> configuredQueues) { boolean create = conf.getBoolean( FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java index 6acba27479f..b115ecf5af4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java @@ -18,16 +18,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.IOException; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.hadoop.security.Groups; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.w3c.dom.Element; import org.w3c.dom.NamedNodeMap; import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +import com.google.common.annotations.VisibleForTesting; public abstract class QueuePlacementRule { protected boolean create; @@ -58,16 +61,20 @@ public QueuePlacementRule initialize(boolean create, Map args) { * continue to the next rule, and null indicates that the app should be rejected. */ public String assignAppToQueue(String requestedQueue, String user, - Groups groups, Collection configuredQueues) throws IOException { - String queue = getQueueForApp(requestedQueue, user, groups, configuredQueues); - if (create || configuredQueues.contains(queue)) { + Groups groups, Map> configuredQueues) + throws IOException { + String queue = getQueueForApp(requestedQueue, user, groups, + configuredQueues); + if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue) + || configuredQueues.get(FSQueueType.PARENT).contains(queue)) { return queue; } else { return ""; } } - public void initializeFromXml(Element el) { + public void initializeFromXml(Element el) + throws AllocationConfigurationException { boolean create = true; NamedNodeMap attributes = el.getAttributes(); Map args = new HashMap(); @@ -104,15 +111,16 @@ public void initializeFromXml(Element el) { * continue to the next rule. */ protected abstract String getQueueForApp(String requestedQueue, String user, - Groups groups, Collection configuredQueues) throws IOException; + Groups groups, Map> configuredQueues) + throws IOException; /** * Places apps in queues by username of the submitter */ public static class User extends QueuePlacementRule { @Override - protected String getQueueForApp(String requestedQueue, - String user, Groups groups, Collection configuredQueues) { + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) { return "root." + user; } @@ -127,9 +135,9 @@ public boolean isTerminal() { */ public static class PrimaryGroup extends QueuePlacementRule { @Override - protected String getQueueForApp(String requestedQueue, - String user, Groups groups, - Collection configuredQueues) throws IOException { + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) + throws IOException { return "root." + groups.getGroups(user).get(0); } @@ -147,12 +155,15 @@ public boolean isTerminal() { */ public static class SecondaryGroupExistingQueue extends QueuePlacementRule { @Override - protected String getQueueForApp(String requestedQueue, - String user, Groups groups, - Collection configuredQueues) throws IOException { + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) + throws IOException { List groupNames = groups.getGroups(user); for (int i = 1; i < groupNames.size(); i++) { - if (configuredQueues.contains("root." + groupNames.get(i))) { + String group = groupNames.get(i); + if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group) + || configuredQueues.get(FSQueueType.PARENT).contains( + "root." + group)) { return "root." + groupNames.get(i); } } @@ -166,13 +177,84 @@ public boolean isTerminal() { } } + /** + * Places apps in queues with name of the submitter under the queue + * returned by the nested rule. + */ + public static class NestedUserQueue extends QueuePlacementRule { + @VisibleForTesting + QueuePlacementRule nestedRule; + + /** + * Parse xml and instantiate the nested rule + */ + @Override + public void initializeFromXml(Element el) + throws AllocationConfigurationException { + NodeList elements = el.getChildNodes(); + + for (int i = 0; i < elements.getLength(); i++) { + Node node = elements.item(i); + if (node instanceof Element) { + Element element = (Element) node; + if ("rule".equals(element.getTagName())) { + QueuePlacementRule rule = QueuePlacementPolicy + .createAndInitializeRule(node); + if (rule == null) { + throw new AllocationConfigurationException( + "Unable to create nested rule in nestedUserQueue rule"); + } + this.nestedRule = rule; + break; + } else { + continue; + } + } + } + + if (this.nestedRule == null) { + throw new AllocationConfigurationException( + "No nested rule specified in rule"); + } + super.initializeFromXml(el); + } + + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) + throws IOException { + // Apply the nested rule + String queueName = nestedRule.assignAppToQueue(requestedQueue, user, + groups, configuredQueues); + + if (queueName != null && queueName != "") { + if (!queueName.startsWith("root.")) { + queueName = "root." + queueName; + } + + // Verify if the queue returned by the nested rule is an configured leaf queue, + // if yes then skip to next rule in the queue placement policy + if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) { + return ""; + } + return queueName + "." + user; + } + return queueName; + } + + @Override + public boolean isTerminal() { + return false; + } + } + /** * Places apps in queues by requested queue of the submitter */ public static class Specified extends QueuePlacementRule { @Override - protected String getQueueForApp(String requestedQueue, - String user, Groups groups, Collection configuredQueues) { + protected String getQueueForApp(String requestedQueue, String user, + Groups groups, Map> configuredQueues) { if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) { return ""; } else { @@ -195,7 +277,7 @@ public boolean isTerminal() { public static class Default extends QueuePlacementRule { @Override protected String getQueueForApp(String requestedQueue, String user, - Groups groups, Collection configuredQueues) { + Groups groups, Map> configuredQueues) { return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME; } @@ -211,13 +293,13 @@ public boolean isTerminal() { public static class Reject extends QueuePlacementRule { @Override public String assignAppToQueue(String requestedQueue, String user, - Groups groups, Collection configuredQueues) { + Groups groups, Map> configuredQueues) { return null; } @Override protected String getQueueForApp(String requestedQueue, String user, - Groups groups, Collection configuredQueues) { + Groups groups, Map> configuredQueues) { throw new UnsupportedOperationException(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 594543b3863..2a725d8bbf4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; import java.io.File; import java.io.FileWriter; @@ -28,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.util.Clock; @@ -99,9 +99,12 @@ public void testReload() throws Exception { assertEquals(1, rules.size()); assertEquals(QueuePlacementRule.Default.class, rules.get(0).getClass()); assertEquals(1, allocConf.getQueueMaxApps("root.queueA")); - assertEquals(2, allocConf.getQueueNames().size()); - assertTrue(allocConf.getQueueNames().contains("root.queueA")); - assertTrue(allocConf.getQueueNames().contains("root.queueB")); + assertEquals(2, allocConf.getConfiguredQueues().get(FSQueueType.LEAF) + .size()); + assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF) + .contains("root.queueA")); + assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF) + .contains("root.queueB")); confHolder.allocConf = null; @@ -114,6 +117,9 @@ public void testReload() throws Exception { out.println(" "); out.println(" "); out.println(" "); + out.println(" "); + out.println(" "); + out.println(" "); out.println(" "); out.println(" "); out.println(""); @@ -131,12 +137,18 @@ public void testReload() throws Exception { allocConf = confHolder.allocConf; policy = allocConf.getPlacementPolicy(); rules = policy.getRules(); - assertEquals(2, rules.size()); + assertEquals(3, rules.size()); assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass()); - assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass()); + assertEquals(QueuePlacementRule.NestedUserQueue.class, rules.get(1) + .getClass()); + assertEquals(QueuePlacementRule.PrimaryGroup.class, + ((NestedUserQueue) (rules.get(1))).nestedRule.getClass()); + assertEquals(QueuePlacementRule.Default.class, rules.get(2).getClass()); assertEquals(3, allocConf.getQueueMaxApps("root.queueB")); - assertEquals(1, allocConf.getQueueNames().size()); - assertTrue(allocConf.getQueueNames().contains("root.queueB")); + assertEquals(1, allocConf.getConfiguredQueues().get(FSQueueType.LEAF) + .size()); + assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF) + .contains("root.queueB")); } @Test @@ -170,6 +182,14 @@ public void testAllocationFileParsing() throws Exception { out.println(""); out.println("60"); out.println(""); + //Make queue F a parent queue without configured leaf queues using the 'type' attribute + out.println(""); + out.println(""); + //Create hierarchical queues G,H + out.println(""); + out.println(" "); + out.println(" "); + out.println(""); // Set default limit of apps per queue to 15 out.println("15"); // Set default limit of apps per user to 5 @@ -194,7 +214,7 @@ public void testAllocationFileParsing() throws Exception { allocLoader.reloadAllocations(); AllocationConfiguration queueConf = confHolder.allocConf; - assertEquals(5, queueConf.getQueueNames().size()); + assertEquals(6, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size()); assertEquals(Resources.createResource(0), queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); assertEquals(Resources.createResource(0), @@ -250,6 +270,14 @@ public void testAllocationFileParsing() throws Exception { assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE")); assertEquals(300000, queueConf.getFairSharePreemptionTimeout()); + assertTrue(queueConf.getConfiguredQueues() + .get(FSQueueType.PARENT) + .contains("root.queueF")); + assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.PARENT) + .contains("root.queueG")); + assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.LEAF) + .contains("root.queueG.queueH")); + // Verify existing queues have default scheduling policy assertEquals(DominantResourceFairnessPolicy.NAME, queueConf.getSchedulingPolicy("root").getName()); @@ -315,7 +343,7 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { allocLoader.reloadAllocations(); AllocationConfiguration queueConf = confHolder.allocConf; - assertEquals(5, queueConf.getQueueNames().size()); + assertEquals(5, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size()); assertEquals(Resources.createResource(0), queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); assertEquals(Resources.createResource(0), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 4bc4a2b61e1..1a38eefefd7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -34,6 +34,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -43,7 +44,6 @@ import javax.xml.parsers.ParserConfigurationException; import org.junit.Assert; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -737,8 +738,11 @@ public void testQueuePlacementWithPolicy() throws Exception { rules.add(new QueuePlacementRule.Default().initialize(true, null)); Set queues = Sets.newHashSet("root.user1", "root.user3group", "root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2"); + Map> configuredQueues = new HashMap>(); + configuredQueues.put(FSQueueType.LEAF, queues); + configuredQueues.put(FSQueueType.PARENT, new HashSet()); scheduler.getAllocationConfiguration().placementPolicy = - new QueuePlacementPolicy(rules, queues, conf); + new QueuePlacementPolicy(rules, configuredQueues, conf); appId = createSchedulingRequest(1024, "somequeue", "user1"); assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "default", "user1"); @@ -758,7 +762,7 @@ public void testQueuePlacementWithPolicy() throws Exception { rules.add(new QueuePlacementRule.Specified().initialize(true, null)); rules.add(new QueuePlacementRule.Default().initialize(true, null)); scheduler.getAllocationConfiguration().placementPolicy = - new QueuePlacementPolicy(rules, queues, conf); + new QueuePlacementPolicy(rules, configuredQueues, conf); appId = createSchedulingRequest(1024, "somequeue", "user1"); assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName()); appId = createSchedulingRequest(1024, "somequeue", "otheruser"); @@ -809,7 +813,89 @@ else if (p.getName().equals("root.queueB")) { } } } + + @Test + public void testNestedUserQueue() throws IOException { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(" "); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); + + FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default", + "user1"); + + assertEquals("root.user1group.user1", user1Leaf.getName()); + } + + @Test + public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + + out.println(""); + out.println(""); + out.println(""); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println(""); + out.println(" "); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW); + RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW); + + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + int capacity = 16 * 1024; + // create node with 16 G + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity), + 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // user1,user2 submit their apps to parentq and create user queues + scheduler.assignToQueue(rmApp1, "root.parentq", "user1"); + scheduler.assignToQueue(rmApp2, "root.parentq", "user2"); + + scheduler.update(); + + Collection leafQueues = scheduler.getQueueManager() + .getLeafQueues(); + + for (FSLeafQueue leaf : leafQueues) { + if (leaf.getName().equals("root.parentq.user1") + || leaf.getName().equals("root.parentq.user2")) { + // assert that the fair share is 1/4th node1's capacity + assertEquals(capacity / 4, leaf.getFairShare().getMemory()); + // assert weights are equal for both the user queues + assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0); + } + } + } + /** * Make allocation requests and ensure they are reflected in queue demand. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java index 66ce394276a..ef0ec7e2d78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java @@ -17,8 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; import java.util.HashSet; @@ -57,45 +56,77 @@ public boolean isEmpty(FSQueue queue) { @Test public void testReloadTurnsLeafQueueIntoParent() throws Exception { - updateConfiguredQueues(queueManager, "queue1"); + updateConfiguredLeafQueues(queueManager, "queue1"); // When no apps are running in the leaf queue, should be fine turning it // into a parent. - updateConfiguredQueues(queueManager, "queue1.queue2"); + updateConfiguredLeafQueues(queueManager, "queue1.queue2"); assertNull(queueManager.getLeafQueue("queue1", false)); assertNotNull(queueManager.getLeafQueue("queue1.queue2", false)); // When leaf queues are empty, should be ok deleting them and // turning parent into a leaf. - updateConfiguredQueues(queueManager, "queue1"); + updateConfiguredLeafQueues(queueManager, "queue1"); assertNull(queueManager.getLeafQueue("queue1.queue2", false)); assertNotNull(queueManager.getLeafQueue("queue1", false)); // When apps exist in leaf queue, we shouldn't be able to create // children under it, but things should work otherwise. notEmptyQueues.add(queueManager.getLeafQueue("queue1", false)); - updateConfiguredQueues(queueManager, "queue1.queue2"); + updateConfiguredLeafQueues(queueManager, "queue1.queue2"); assertNull(queueManager.getLeafQueue("queue1.queue2", false)); assertNotNull(queueManager.getLeafQueue("queue1", false)); // When apps exist in leaf queues under a parent queue, shouldn't be // able to turn it into a leaf queue, but things should work otherwise. notEmptyQueues.clear(); - updateConfiguredQueues(queueManager, "queue1.queue2"); + updateConfiguredLeafQueues(queueManager, "queue1.queue2"); notEmptyQueues.add(queueManager.getQueue("root.queue1")); - updateConfiguredQueues(queueManager, "queue1"); + updateConfiguredLeafQueues(queueManager, "queue1"); assertNotNull(queueManager.getLeafQueue("queue1.queue2", false)); assertNull(queueManager.getLeafQueue("queue1", false)); // Should never to be able to create a queue under the default queue - updateConfiguredQueues(queueManager, "default.queue3"); + updateConfiguredLeafQueues(queueManager, "default.queue3"); assertNull(queueManager.getLeafQueue("default.queue3", false)); assertNotNull(queueManager.getLeafQueue("default", false)); } - private void updateConfiguredQueues(QueueManager queueMgr, String... confQueues) { + @Test + public void testReloadTurnsLeafToParentWithNoLeaf() { AllocationConfiguration allocConf = new AllocationConfiguration(conf); - allocConf.queueNames = Sets.newHashSet(confQueues); + // Create a leaf queue1 + allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.queue1"); + queueManager.updateAllocationConfiguration(allocConf); + assertNotNull(queueManager.getLeafQueue("root.queue1", false)); + + // Lets say later on admin makes queue1 a parent queue by + // specifying "type=parent" in the alloc xml and lets say apps running in + // queue1 + notEmptyQueues.add(queueManager.getLeafQueue("root.queue1", false)); + allocConf = new AllocationConfiguration(conf); + allocConf.configuredQueues.get(FSQueueType.PARENT) + .add("root.queue1"); + + // When allocs are reloaded queue1 shouldn't be converter to parent + queueManager.updateAllocationConfiguration(allocConf); + assertNotNull(queueManager.getLeafQueue("root.queue1", false)); + assertNull(queueManager.getParentQueue("root.queue1", false)); + + // Now lets assume apps completed and there are no apps in queue1 + notEmptyQueues.clear(); + // We should see queue1 transform from leaf queue to parent queue. + queueManager.updateAllocationConfiguration(allocConf); + assertNull(queueManager.getLeafQueue("root.queue1", false)); + assertNotNull(queueManager.getParentQueue("root.queue1", false)); + // this parent should not have any children + assertTrue(queueManager.getParentQueue("root.queue1", false) + .getChildQueues().isEmpty()); + } + + private void updateConfiguredLeafQueues(QueueManager queueMgr, String... confLeafQueues) { + AllocationConfiguration allocConf = new AllocationConfiguration(conf); + allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues)); queueMgr.updateAllocationConfiguration(allocConf); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java index fd807c9d7e1..640d771f2c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.Set; import javax.xml.parsers.DocumentBuilder; @@ -28,16 +31,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.security.GroupMappingServiceProvider; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; import org.w3c.dom.Document; import org.w3c.dom.Element; -import com.google.common.collect.Sets; - public class TestQueuePlacementPolicy { private final static Configuration conf = new Configuration(); - private final static Set configuredQueues = Sets.newHashSet("root.someuser"); + private Map> configuredQueues; @BeforeClass public static void setup() { @@ -45,6 +47,14 @@ public static void setup() { SimpleGroupsMapping.class, GroupMappingServiceProvider.class); } + @Before + public void initTest() { + configuredQueues = new HashMap>(); + for (FSQueueType type : FSQueueType.values()) { + configuredQueues.put(type, new HashSet()); + } + } + @Test public void testSpecifiedUserPolicy() throws Exception { StringBuffer sb = new StringBuffer(); @@ -53,9 +63,12 @@ public void testSpecifiedUserPolicy() throws Exception { sb.append(" "); sb.append(""); QueuePlacementPolicy policy = parse(sb.toString()); - assertEquals("root.specifiedq",policy.assignAppToQueue("specifiedq", "someuser")); - assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser")); - assertEquals("root.otheruser", policy.assignAppToQueue("default", "otheruser")); + assertEquals("root.specifiedq", + policy.assignAppToQueue("specifiedq", "someuser")); + assertEquals("root.someuser", + policy.assignAppToQueue("default", "someuser")); + assertEquals("root.otheruser", + policy.assignAppToQueue("default", "otheruser")); } @Test @@ -66,6 +79,8 @@ public void testNoCreate() throws Exception { sb.append(" "); sb.append(" "); sb.append(""); + + configuredQueues.get(FSQueueType.LEAF).add("root.someuser"); QueuePlacementPolicy policy = parse(sb.toString()); assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser")); assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser")); @@ -81,7 +96,8 @@ public void testSpecifiedThenReject() throws Exception { sb.append(" "); sb.append(""); QueuePlacementPolicy policy = parse(sb.toString()); - assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser")); + assertEquals("root.specifiedq", + policy.assignAppToQueue("specifiedq", "someuser")); assertEquals(null, policy.assignAppToQueue("default", "someuser")); } @@ -117,10 +133,188 @@ public void testTerminals() throws Exception { parse(sb.toString()); } + @Test + public void testNestedUserQueueParsingErrors() { + // No nested rule specified in hierarchical user queue + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(""); + + assertIfExceptionThrown(sb); + + // Specified nested rule is not a QueuePlacementRule + sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(""); + + assertIfExceptionThrown(sb); + } + + private void assertIfExceptionThrown(StringBuffer sb) { + Throwable th = null; + try { + parse(sb.toString()); + } catch (Exception e) { + th = e; + } + + assertTrue(th instanceof AllocationConfigurationException); + } + + @Test + public void testNestedUserQueueParsing() throws Exception { + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(""); + + Throwable th = null; + try { + parse(sb.toString()); + } catch (Exception e) { + th = e; + } + + assertNull(th); + } + + @Test + public void testNestedUserQueuePrimaryGroup() throws Exception { + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(""); + + // User queue would be created under primary group queue + QueuePlacementPolicy policy = parse(sb.toString()); + assertEquals("root.user1group.user1", + policy.assignAppToQueue("root.default", "user1")); + // Other rules above and below hierarchical user queue rule should work as + // usual + configuredQueues.get(FSQueueType.LEAF).add("root.specifiedq"); + // test if specified rule(above nestedUserQueue rule) works ok + assertEquals("root.specifiedq", + policy.assignAppToQueue("root.specifiedq", "user2")); + + // test if default rule(below nestedUserQueue rule) works + configuredQueues.get(FSQueueType.LEAF).add("root.user3group"); + assertEquals("root.default", + policy.assignAppToQueue("root.default", "user3")); + } + + @Test + public void testNestedUserQueuePrimaryGroupNoCreate() throws Exception { + // Primary group rule has create='false' + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(""); + + QueuePlacementPolicy policy = parse(sb.toString()); + + // Should return root.default since primary group 'root.user1group' is not + // configured + assertEquals("root.default", + policy.assignAppToQueue("root.default", "user1")); + + // Let's configure primary group and check if user queue is created + configuredQueues.get(FSQueueType.PARENT).add("root.user1group"); + policy = parse(sb.toString()); + assertEquals("root.user1group.user1", + policy.assignAppToQueue("root.default", "user1")); + + // Both Primary group and nestedUserQueue rule has create='false' + sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(""); + + // Should return root.default since primary group and user queue for user 2 + // are not configured. + assertEquals("root.default", + policy.assignAppToQueue("root.default", "user2")); + + // Now configure both primary group and the user queue for user2 + configuredQueues.get(FSQueueType.PARENT).add("root.user2group"); + configuredQueues.get(FSQueueType.LEAF).add("root.user2group.user2"); + policy = parse(sb.toString()); + + assertEquals("root.user2group.user2", + policy.assignAppToQueue("root.default", "user2")); + } + + @Test + public void testNestedUserQueueSecondaryGroup() throws Exception { + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(""); + + QueuePlacementPolicy policy = parse(sb.toString()); + // Should return root.default since secondary groups are not configured + assertEquals("root.default", + policy.assignAppToQueue("root.default", "user1")); + + // configure secondary group for user1 + configuredQueues.get(FSQueueType.PARENT).add("root.user1subgroup1"); + policy = parse(sb.toString()); + // user queue created should be created under secondary group + assertEquals("root.user1subgroup1.user1", + policy.assignAppToQueue("root.default", "user1")); + } + + @Test + public void testNestedUserQueueSpecificRule() throws Exception { + // This test covers the use case where users can specify different parent + // queues and want user queues under those. + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(""); + + // Let's create couple of parent queues + configuredQueues.get(FSQueueType.PARENT).add("root.parent1"); + configuredQueues.get(FSQueueType.PARENT).add("root.parent2"); + + QueuePlacementPolicy policy = parse(sb.toString()); + assertEquals("root.parent1.user1", + policy.assignAppToQueue("root.parent1", "user1")); + assertEquals("root.parent2.user2", + policy.assignAppToQueue("root.parent2", "user2")); + } + private QueuePlacementPolicy parse(String str) throws Exception { // Read and parse the allocations file. - DocumentBuilderFactory docBuilderFactory = - DocumentBuilderFactory.newInstance(); + DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory + .newInstance(); docBuilderFactory.setIgnoringComments(true); DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); Document doc = builder.parse(IOUtils.toInputStream(str)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 32bb0b80dda..02890a1c357 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -206,8 +206,10 @@ Allocation file format The allocation file must be in XML format. The format contains five types of elements: - * <>, which represent queues. Each may contain the following - properties: + * <>, which represent queues. Queue elements can take an optional + attribute ’type’,which when set to ‘parent’ makes it a parent queue. This is useful + when we want to create a parent queue without configuring any leaf queues. + Each queue element may contain the following properties: * minResources: minimum resources the queue is entitled to, in the form "X mb, Y vcores". For the single-resource fairness policy, the vcores @@ -299,6 +301,15 @@ Allocation file format that matches a secondary group of the user who submitted it. The first secondary group that matches a configured queue will be selected. + * nestedUserQueue : the app is placed into a queue with the name of the user + under the queue suggested by the nested rule. This is similar to ‘user’ + rule,the difference being in ‘nestedUserQueue’ rule,user queues can be created + under any parent queue, while ‘user’ rule creates user queues only under root queue. + Note that nestedUserQueue rule would be applied only if the nested rule returns a + parent queue.One can configure a parent queue either by setting ‘type’ attribute of queue + to ‘parent’ or by configuring at least one leaf under that queue which makes it a parent. + See example allocation for a sample use case. + * default: the app is placed into the queue named "default". * reject: the app is rejected. @@ -319,6 +330,12 @@ Allocation file format 5000 mb,0vcores + + + + 3.0 + 30 @@ -328,6 +345,9 @@ Allocation file format + + +