YARN-1864. Fair Scheduler Dynamic Hierarchical User Queues (Ashwin Shankar via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1593192 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-05-08 07:23:19 +00:00
parent 868e9b6431
commit 4518a0db27
12 changed files with 687 additions and 134 deletions

View File

@ -8,6 +8,9 @@ Release 2.5.0 - UNRELEASED
YARN-1757. NM Recovery. Auxiliary service support. (Jason Lowe via kasha) YARN-1757. NM Recovery. Auxiliary service support. (Jason Lowe via kasha)
YARN-1864. Fair Scheduler Dynamic Hierarchical User Queues (Ashwin Shankar
via Sandy Ryza)
IMPROVEMENTS IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via

View File

@ -77,19 +77,22 @@ public class AllocationConfiguration {
@VisibleForTesting @VisibleForTesting
QueuePlacementPolicy placementPolicy; QueuePlacementPolicy placementPolicy;
//Configured queues in the alloc xml
@VisibleForTesting @VisibleForTesting
Set<String> queueNames; Map<FSQueueType, Set<String>> configuredQueues;
public AllocationConfiguration(Map<String, Resource> minQueueResources, public AllocationConfiguration(Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps, Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault, Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
int queueMaxAppsDefault, Map<String, SchedulingPolicy> schedulingPolicies, int queueMaxAppsDefault,
Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy, SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts, Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, Map<String, Map<QueueACL, AccessControlList>> queueAcls,
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout, long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout,
QueuePlacementPolicy placementPolicy, Set<String> queueNames) { QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues) {
this.minQueueResources = minQueueResources; this.minQueueResources = minQueueResources;
this.maxQueueResources = maxQueueResources; this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps; this.queueMaxApps = queueMaxApps;
@ -104,7 +107,7 @@ public AllocationConfiguration(Map<String, Resource> minQueueResources,
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout; this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
this.placementPolicy = placementPolicy; this.placementPolicy = placementPolicy;
this.queueNames = queueNames; this.configuredQueues = configuredQueues;
} }
public AllocationConfiguration(Configuration conf) { public AllocationConfiguration(Configuration conf) {
@ -121,9 +124,12 @@ public AllocationConfiguration(Configuration conf) {
fairSharePreemptionTimeout = Long.MAX_VALUE; fairSharePreemptionTimeout = Long.MAX_VALUE;
schedulingPolicies = new HashMap<String, SchedulingPolicy>(); schedulingPolicies = new HashMap<String, SchedulingPolicy>();
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
configuredQueues = new HashMap<FSQueueType, Set<String>>();
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>());
}
placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
new HashSet<String>()); configuredQueues);
queueNames = new HashSet<String>();
} }
/** /**
@ -221,8 +227,8 @@ public SchedulingPolicy getDefaultSchedulingPolicy() {
return defaultSchedulingPolicy; return defaultSchedulingPolicy;
} }
public Set<String> getQueueNames() { public Map<FSQueueType, Set<String>> getConfiguredQueues() {
return queueNames; return configuredQueues;
} }
public QueuePlacementPolicy getPlacementPolicy() { public QueuePlacementPolicy getPlacementPolicy() {

View File

@ -214,7 +214,14 @@ public synchronized void reloadAllocations() throws IOException,
QueuePlacementPolicy newPlacementPolicy = null; QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc. // Remember all queue names so we can display them on web UI, etc.
Set<String> queueNamesInAllocFile = new HashSet<String>(); // configuredQueues is segregated based on whether it is a leaf queue
// or a parent queue. This information is used for creating queues
// and also for making queue placement decisions(QueuePlacementRule.java).
Map<FSQueueType, Set<String>> configuredQueues =
new HashMap<FSQueueType, Set<String>>();
for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>());
}
// Read and parse the allocations file. // Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory docBuilderFactory =
@ -289,26 +296,27 @@ public synchronized void reloadAllocations() throws IOException,
} }
parent = null; parent = null;
} }
loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, loadQueue(parent, element, minQueueResources, maxQueueResources,
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, queueMaxApps, userMaxApps, queueWeights, queuePolicies,
queueAcls, queueNamesInAllocFile); minSharePreemptionTimeouts, queueAcls,
configuredQueues);
} }
// Load placement policy and pass it configured queues // Load placement policy and pass it configured queues
Configuration conf = getConfig(); Configuration conf = getConfig();
if (placementPolicyElement != null) { if (placementPolicyElement != null) {
newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement, newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
queueNamesInAllocFile, conf); configuredQueues, conf);
} else { } else {
newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf, newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
queueNamesInAllocFile); configuredQueues);
} }
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
newPlacementPolicy, queueNamesInAllocFile); newPlacementPolicy, configuredQueues);
lastSuccessfulReload = clock.getTime(); lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false; lastReloadAttemptFailed = false;
@ -324,7 +332,8 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights, Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies, Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts, Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, Set<String> queueNamesInAllocFile) Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<FSQueueType, Set<String>> configuredQueues)
throws AllocationConfigurationException { throws AllocationConfigurationException {
String queueName = element.getAttribute("name"); String queueName = element.getAttribute("name");
if (parentName != null) { if (parentName != null) {
@ -375,13 +384,19 @@ private void loadQueue(String parentName, Element element, Map<String, Resource>
"pool".equals(field.getTagName())) { "pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources, loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, queuePolicies, queueMaxApps, userMaxApps, queueWeights, queuePolicies,
minSharePreemptionTimeouts, minSharePreemptionTimeouts, queueAcls, configuredQueues);
queueAcls, queueNamesInAllocFile); configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false; isLeaf = false;
} }
} }
if (isLeaf) { if (isLeaf) {
queueNamesInAllocFile.add(queueName); // if a leaf in the alloc file is marked as type='parent'
// then store it under 'parent'
if ("parent".equals(element.getAttribute("type"))) {
configuredQueues.get(FSQueueType.PARENT).add(queueName);
} else {
configuredQueues.get(FSQueueType.LEAF).add(queueName);
}
} }
queueAcls.put(queueName, acls); queueAcls.put(queueName, acls);
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName) if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)

View File

@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
public enum FSQueueType {
/*
* Represents a leaf queue
*/
LEAF,
/*
* Represents a parent queue
*/
PARENT
}

View File

@ -74,7 +74,7 @@ public void initialize(Configuration conf) throws IOException,
} }
/** /**
* Get a queue by name, creating it if the create param is true and is necessary. * Get a leaf queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a leaf queue, i.e. it already exists as a * If the queue is not or can not be a leaf queue, i.e. it already exists as a
* parent queue, or one of the parents in its name is already a leaf queue, * parent queue, or one of the parents in its name is already a leaf queue,
* null is returned. * null is returned.
@ -85,31 +85,53 @@ public void initialize(Configuration conf) throws IOException,
* could be referred to as just "parent1.queue2". * could be referred to as just "parent1.queue2".
*/ */
public FSLeafQueue getLeafQueue(String name, boolean create) { public FSLeafQueue getLeafQueue(String name, boolean create) {
name = ensureRootPrefix(name); FSQueue queue = getQueue(name, create, FSQueueType.LEAF);
synchronized (queues) { if (queue instanceof FSParentQueue) {
FSQueue queue = queues.get(name);
if (queue == null && create) {
FSLeafQueue leafQueue = createLeafQueue(name);
if (leafQueue == null) {
return null;
}
queue = leafQueue;
} else if (queue instanceof FSParentQueue) {
return null; return null;
} }
return (FSLeafQueue) queue; return (FSLeafQueue) queue;
} }
/**
* Get a parent queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a parent queue, i.e. it already exists as a
* leaf queue, or one of the parents in its name is already a leaf queue,
* null is returned.
*
* The root part of the name is optional, so a queue underneath the root
* named "queue1" could be referred to as just "queue1", and a queue named
* "queue2" underneath a parent named "parent1" that is underneath the root
* could be referred to as just "parent1.queue2".
*/
public FSParentQueue getParentQueue(String name, boolean create) {
FSQueue queue = getQueue(name, create, FSQueueType.PARENT);
if (queue instanceof FSLeafQueue) {
return null;
}
return (FSParentQueue) queue;
}
private FSQueue getQueue(String name, boolean create, FSQueueType queueType) {
name = ensureRootPrefix(name);
synchronized (queues) {
FSQueue queue = queues.get(name);
if (queue == null && create) {
// if the queue doesn't exist,create it and return
queue = createQueue(name, queueType);
}
return queue;
}
} }
/** /**
* Creates a leaf queue and places it in the tree. Creates any * Creates a leaf or parent queue based on what is specified in 'queueType'
* parents that don't already exist. * and places it in the tree. Creates any parents that don't already exist.
* *
* @return * @return
* the created queue, if successful. null if not allowed (one of the parent * the created queue, if successful. null if not allowed (one of the parent
* queues in the queue name is already a leaf queue) * queues in the queue name is already a leaf queue)
*/ */
private FSLeafQueue createLeafQueue(String name) { private FSQueue createQueue(String name, FSQueueType queueType) {
List<String> newQueueNames = new ArrayList<String>(); List<String> newQueueNames = new ArrayList<String>();
newQueueNames.add(name); newQueueNames.add(name);
int sepIndex = name.length(); int sepIndex = name.length();
@ -143,8 +165,7 @@ private FSLeafQueue createLeafQueue(String name) {
FSLeafQueue leafQueue = null; FSLeafQueue leafQueue = null;
for (int i = newQueueNames.size()-1; i >= 0; i--) { for (int i = newQueueNames.size()-1; i >= 0; i--) {
String queueName = newQueueNames.get(i); String queueName = newQueueNames.get(i);
if (i == 0) { if (i == 0 && queueType != FSQueueType.PARENT) {
// First name added was the leaf queue
leafQueue = new FSLeafQueue(name, scheduler, parent); leafQueue = new FSLeafQueue(name, scheduler, parent);
try { try {
leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy()); leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
@ -155,6 +176,7 @@ private FSLeafQueue createLeafQueue(String name) {
parent.addChildQueue(leafQueue); parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue); queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue); leafQueues.add(leafQueue);
return leafQueue;
} else { } else {
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
try { try {
@ -169,53 +191,64 @@ private FSLeafQueue createLeafQueue(String name) {
} }
} }
return leafQueue; return parent;
} }
/** /**
* Make way for the given leaf queue if possible, by removing incompatible * Make way for the given queue if possible, by removing incompatible
* queues with no apps in them. Incompatibility could be due to * queues with no apps in them. Incompatibility could be due to
* (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in * (1) queueToCreate being currently a parent but needs to change to leaf
* the ancestry of leafToCreate. * (2) queueToCreate being currently a leaf but needs to change to parent
* (3) an existing leaf queue in the ancestry of queueToCreate.
* *
* We will never remove the root queue or the default queue in this way. * We will never remove the root queue or the default queue in this way.
* *
* @return true if we can create leafToCreate or it already exists. * @return true if we can create queueToCreate or it already exists.
*/ */
private boolean removeEmptyIncompatibleQueues(String leafToCreate) { private boolean removeEmptyIncompatibleQueues(String queueToCreate,
leafToCreate = ensureRootPrefix(leafToCreate); FSQueueType queueType) {
queueToCreate = ensureRootPrefix(queueToCreate);
// Ensure leafToCreate is not root and doesn't have the default queue in its // Ensure queueToCreate is not root and doesn't have the default queue in its
// ancestry. // ancestry.
if (leafToCreate.equals(ROOT_QUEUE) || if (queueToCreate.equals(ROOT_QUEUE) ||
leafToCreate.startsWith( queueToCreate.startsWith(
ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) { ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
return false; return false;
} }
FSQueue queue = queues.get(leafToCreate); FSQueue queue = queues.get(queueToCreate);
// Queue exists already. // Queue exists already.
if (queue != null) { if (queue != null) {
if (queue instanceof FSLeafQueue) { if (queue instanceof FSLeafQueue) {
// If it's an already existing leaf, we're ok. if (queueType == FSQueueType.LEAF) {
// if queue is already a leaf then return true
return true; return true;
}
// remove incompatibility since queue is a leaf currently
// needs to change to a parent.
return removeQueueIfEmpty(queue);
} else { } else {
// If it's an existing parent queue, remove it if it's empty. if (queueType == FSQueueType.PARENT) {
return true;
}
// If it's an existing parent queue and needs to change to leaf,
// remove it if it's empty.
return removeQueueIfEmpty(queue); return removeQueueIfEmpty(queue);
} }
} }
// Queue doesn't exist already. Check if the new queue would be created // Queue doesn't exist already. Check if the new queue would be created
// under an existing leaf queue. If so, try removing that leaf queue. // under an existing leaf queue. If so, try removing that leaf queue.
int sepIndex = leafToCreate.length(); int sepIndex = queueToCreate.length();
sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1); sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
while (sepIndex != -1) { while (sepIndex != -1) {
String prefixString = leafToCreate.substring(0, sepIndex); String prefixString = queueToCreate.substring(0, sepIndex);
FSQueue prefixQueue = queues.get(prefixString); FSQueue prefixQueue = queues.get(prefixString);
if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) { if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
return removeQueueIfEmpty(prefixQueue); return removeQueueIfEmpty(prefixQueue);
} }
sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1); sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
} }
return true; return true;
} }
@ -312,13 +345,22 @@ private String ensureRootPrefix(String name) {
} }
public void updateAllocationConfiguration(AllocationConfiguration queueConf) { public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
// Make sure all queues exist // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
for (String name : queueConf.getQueueNames()) { for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) {
if (removeEmptyIncompatibleQueues(name)) { if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
getLeafQueue(name, true); getLeafQueue(name, true);
} }
} }
// At this point all leaves and 'parents with at least one child' would have been created.
// Now create parents with no configured leaf.
for (String name : queueConf.getConfiguredQueues().get(
FSQueueType.PARENT)) {
if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
getParentQueue(name, true);
}
}
for (FSQueue queue : queues.values()) { for (FSQueue queue : queues.values()) {
// Update queue metrics // Update queue metrics
FSQueueMetrics queueMetrics = queue.getMetrics(); FSQueueMetrics queueMetrics = queue.getMetrics();

View File

@ -42,17 +42,19 @@ public class QueuePlacementPolicy {
map.put("secondaryGroupExistingQueue", map.put("secondaryGroupExistingQueue",
QueuePlacementRule.SecondaryGroupExistingQueue.class); QueuePlacementRule.SecondaryGroupExistingQueue.class);
map.put("specified", QueuePlacementRule.Specified.class); map.put("specified", QueuePlacementRule.Specified.class);
map.put("nestedUserQueue",
QueuePlacementRule.NestedUserQueue.class);
map.put("default", QueuePlacementRule.Default.class); map.put("default", QueuePlacementRule.Default.class);
map.put("reject", QueuePlacementRule.Reject.class); map.put("reject", QueuePlacementRule.Reject.class);
ruleClasses = Collections.unmodifiableMap(map); ruleClasses = Collections.unmodifiableMap(map);
} }
private final List<QueuePlacementRule> rules; private final List<QueuePlacementRule> rules;
private final Set<String> configuredQueues; private final Map<FSQueueType, Set<String>> configuredQueues;
private final Groups groups; private final Groups groups;
public QueuePlacementPolicy(List<QueuePlacementRule> rules, public QueuePlacementPolicy(List<QueuePlacementRule> rules,
Set<String> configuredQueues, Configuration conf) Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
throws AllocationConfigurationException { throws AllocationConfigurationException {
for (int i = 0; i < rules.size()-1; i++) { for (int i = 0; i < rules.size()-1; i++) {
if (rules.get(i).isTerminal()) { if (rules.get(i).isTerminal()) {
@ -72,19 +74,35 @@ public QueuePlacementPolicy(List<QueuePlacementRule> rules,
/** /**
* Builds a QueuePlacementPolicy from an xml element. * Builds a QueuePlacementPolicy from an xml element.
*/ */
public static QueuePlacementPolicy fromXml(Element el, Set<String> configuredQueues, public static QueuePlacementPolicy fromXml(Element el,
Configuration conf) throws AllocationConfigurationException { Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
throws AllocationConfigurationException {
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>(); List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
NodeList elements = el.getChildNodes(); NodeList elements = el.getChildNodes();
for (int i = 0; i < elements.getLength(); i++) { for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i); Node node = elements.item(i);
if (node instanceof Element) { if (node instanceof Element) {
QueuePlacementRule rule = createAndInitializeRule(node);
rules.add(rule);
}
}
return new QueuePlacementPolicy(rules, configuredQueues, conf);
}
/**
* Create and initialize a rule given a xml node
* @param node
* @return QueuePlacementPolicy
* @throws AllocationConfigurationException
*/
public static QueuePlacementRule createAndInitializeRule(Node node)
throws AllocationConfigurationException {
Element element = (Element) node; Element element = (Element) node;
String ruleName = element.getAttribute("name"); String ruleName = element.getAttribute("name");
if ("".equals(ruleName)) { if ("".equals(ruleName)) {
throw new AllocationConfigurationException("No name provided for a " + throw new AllocationConfigurationException("No name provided for a "
"rule element"); + "rule element");
} }
Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName); Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
@ -94,10 +112,7 @@ public static QueuePlacementPolicy fromXml(Element el, Set<String> configuredQue
} }
QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null); QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
rule.initializeFromXml(element); rule.initializeFromXml(element);
rules.add(rule); return rule;
}
}
return new QueuePlacementPolicy(rules, configuredQueues, conf);
} }
/** /**
@ -105,7 +120,7 @@ public static QueuePlacementPolicy fromXml(Element el, Set<String> configuredQue
* user-as-default-queue configuration options. * user-as-default-queue configuration options.
*/ */
public static QueuePlacementPolicy fromConfiguration(Configuration conf, public static QueuePlacementPolicy fromConfiguration(Configuration conf,
Set<String> configuredQueues) { Map<FSQueueType, Set<String>> configuredQueues) {
boolean create = conf.getBoolean( boolean create = conf.getBoolean(
FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS); FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);

View File

@ -18,16 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException; import java.io.IOException;
import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.Groups;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.w3c.dom.Element; import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap; import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node; import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import com.google.common.annotations.VisibleForTesting;
public abstract class QueuePlacementRule { public abstract class QueuePlacementRule {
protected boolean create; protected boolean create;
@ -58,16 +61,20 @@ public QueuePlacementRule initialize(boolean create, Map<String, String> args) {
* continue to the next rule, and null indicates that the app should be rejected. * continue to the next rule, and null indicates that the app should be rejected.
*/ */
public String assignAppToQueue(String requestedQueue, String user, public String assignAppToQueue(String requestedQueue, String user,
Groups groups, Collection<String> configuredQueues) throws IOException { Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
String queue = getQueueForApp(requestedQueue, user, groups, configuredQueues); throws IOException {
if (create || configuredQueues.contains(queue)) { String queue = getQueueForApp(requestedQueue, user, groups,
configuredQueues);
if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue)
|| configuredQueues.get(FSQueueType.PARENT).contains(queue)) {
return queue; return queue;
} else { } else {
return ""; return "";
} }
} }
public void initializeFromXml(Element el) { public void initializeFromXml(Element el)
throws AllocationConfigurationException {
boolean create = true; boolean create = true;
NamedNodeMap attributes = el.getAttributes(); NamedNodeMap attributes = el.getAttributes();
Map<String, String> args = new HashMap<String, String>(); Map<String, String> args = new HashMap<String, String>();
@ -104,15 +111,16 @@ public void initializeFromXml(Element el) {
* continue to the next rule. * continue to the next rule.
*/ */
protected abstract String getQueueForApp(String requestedQueue, String user, protected abstract String getQueueForApp(String requestedQueue, String user,
Groups groups, Collection<String> configuredQueues) throws IOException; Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
throws IOException;
/** /**
* Places apps in queues by username of the submitter * Places apps in queues by username of the submitter
*/ */
public static class User extends QueuePlacementRule { public static class User extends QueuePlacementRule {
@Override @Override
protected String getQueueForApp(String requestedQueue, protected String getQueueForApp(String requestedQueue, String user,
String user, Groups groups, Collection<String> configuredQueues) { Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return "root." + user; return "root." + user;
} }
@ -127,9 +135,9 @@ public boolean isTerminal() {
*/ */
public static class PrimaryGroup extends QueuePlacementRule { public static class PrimaryGroup extends QueuePlacementRule {
@Override @Override
protected String getQueueForApp(String requestedQueue, protected String getQueueForApp(String requestedQueue, String user,
String user, Groups groups, Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
Collection<String> configuredQueues) throws IOException { throws IOException {
return "root." + groups.getGroups(user).get(0); return "root." + groups.getGroups(user).get(0);
} }
@ -147,12 +155,15 @@ public boolean isTerminal() {
*/ */
public static class SecondaryGroupExistingQueue extends QueuePlacementRule { public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
@Override @Override
protected String getQueueForApp(String requestedQueue, protected String getQueueForApp(String requestedQueue, String user,
String user, Groups groups, Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
Collection<String> configuredQueues) throws IOException { throws IOException {
List<String> groupNames = groups.getGroups(user); List<String> groupNames = groups.getGroups(user);
for (int i = 1; i < groupNames.size(); i++) { for (int i = 1; i < groupNames.size(); i++) {
if (configuredQueues.contains("root." + groupNames.get(i))) { String group = groupNames.get(i);
if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
|| configuredQueues.get(FSQueueType.PARENT).contains(
"root." + group)) {
return "root." + groupNames.get(i); return "root." + groupNames.get(i);
} }
} }
@ -166,13 +177,84 @@ public boolean isTerminal() {
} }
} }
/**
* Places apps in queues with name of the submitter under the queue
* returned by the nested rule.
*/
public static class NestedUserQueue extends QueuePlacementRule {
@VisibleForTesting
QueuePlacementRule nestedRule;
/**
* Parse xml and instantiate the nested rule
*/
@Override
public void initializeFromXml(Element el)
throws AllocationConfigurationException {
NodeList elements = el.getChildNodes();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
Element element = (Element) node;
if ("rule".equals(element.getTagName())) {
QueuePlacementRule rule = QueuePlacementPolicy
.createAndInitializeRule(node);
if (rule == null) {
throw new AllocationConfigurationException(
"Unable to create nested rule in nestedUserQueue rule");
}
this.nestedRule = rule;
break;
} else {
continue;
}
}
}
if (this.nestedRule == null) {
throw new AllocationConfigurationException(
"No nested rule specified in <nestedUserQueue> rule");
}
super.initializeFromXml(el);
}
@Override
protected String getQueueForApp(String requestedQueue, String user,
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
throws IOException {
// Apply the nested rule
String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
groups, configuredQueues);
if (queueName != null && queueName != "") {
if (!queueName.startsWith("root.")) {
queueName = "root." + queueName;
}
// Verify if the queue returned by the nested rule is an configured leaf queue,
// if yes then skip to next rule in the queue placement policy
if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
return "";
}
return queueName + "." + user;
}
return queueName;
}
@Override
public boolean isTerminal() {
return false;
}
}
/** /**
* Places apps in queues by requested queue of the submitter * Places apps in queues by requested queue of the submitter
*/ */
public static class Specified extends QueuePlacementRule { public static class Specified extends QueuePlacementRule {
@Override @Override
protected String getQueueForApp(String requestedQueue, protected String getQueueForApp(String requestedQueue, String user,
String user, Groups groups, Collection<String> configuredQueues) { Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) { if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
return ""; return "";
} else { } else {
@ -195,7 +277,7 @@ public boolean isTerminal() {
public static class Default extends QueuePlacementRule { public static class Default extends QueuePlacementRule {
@Override @Override
protected String getQueueForApp(String requestedQueue, String user, protected String getQueueForApp(String requestedQueue, String user,
Groups groups, Collection<String> configuredQueues) { Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME; return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
} }
@ -211,13 +293,13 @@ public boolean isTerminal() {
public static class Reject extends QueuePlacementRule { public static class Reject extends QueuePlacementRule {
@Override @Override
public String assignAppToQueue(String requestedQueue, String user, public String assignAppToQueue(String requestedQueue, String user,
Groups groups, Collection<String> configuredQueues) { Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return null; return null;
} }
@Override @Override
protected String getQueueForApp(String requestedQueue, String user, protected String getQueueForApp(String requestedQueue, String user,
Groups groups, Collection<String> configuredQueues) { Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import java.io.File; import java.io.File;
import java.io.FileWriter; import java.io.FileWriter;
@ -28,6 +27,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.Clock;
@ -99,9 +99,12 @@ public void testReload() throws Exception {
assertEquals(1, rules.size()); assertEquals(1, rules.size());
assertEquals(QueuePlacementRule.Default.class, rules.get(0).getClass()); assertEquals(QueuePlacementRule.Default.class, rules.get(0).getClass());
assertEquals(1, allocConf.getQueueMaxApps("root.queueA")); assertEquals(1, allocConf.getQueueMaxApps("root.queueA"));
assertEquals(2, allocConf.getQueueNames().size()); assertEquals(2, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
assertTrue(allocConf.getQueueNames().contains("root.queueA")); .size());
assertTrue(allocConf.getQueueNames().contains("root.queueB")); assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.queueA"));
assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.queueB"));
confHolder.allocConf = null; confHolder.allocConf = null;
@ -114,6 +117,9 @@ public void testReload() throws Exception {
out.println(" </queue>"); out.println(" </queue>");
out.println(" <queuePlacementPolicy>"); out.println(" <queuePlacementPolicy>");
out.println(" <rule name='specified' />"); out.println(" <rule name='specified' />");
out.println(" <rule name='nestedUserQueue' >");
out.println(" <rule name='primaryGroup' />");
out.println(" </rule>");
out.println(" <rule name='default' />"); out.println(" <rule name='default' />");
out.println(" </queuePlacementPolicy>"); out.println(" </queuePlacementPolicy>");
out.println("</allocations>"); out.println("</allocations>");
@ -131,12 +137,18 @@ public void testReload() throws Exception {
allocConf = confHolder.allocConf; allocConf = confHolder.allocConf;
policy = allocConf.getPlacementPolicy(); policy = allocConf.getPlacementPolicy();
rules = policy.getRules(); rules = policy.getRules();
assertEquals(2, rules.size()); assertEquals(3, rules.size());
assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass()); assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass()); assertEquals(QueuePlacementRule.NestedUserQueue.class, rules.get(1)
.getClass());
assertEquals(QueuePlacementRule.PrimaryGroup.class,
((NestedUserQueue) (rules.get(1))).nestedRule.getClass());
assertEquals(QueuePlacementRule.Default.class, rules.get(2).getClass());
assertEquals(3, allocConf.getQueueMaxApps("root.queueB")); assertEquals(3, allocConf.getQueueMaxApps("root.queueB"));
assertEquals(1, allocConf.getQueueNames().size()); assertEquals(1, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
assertTrue(allocConf.getQueueNames().contains("root.queueB")); .size());
assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.queueB"));
} }
@Test @Test
@ -170,6 +182,14 @@ public void testAllocationFileParsing() throws Exception {
out.println("<queue name=\"queueE\">"); out.println("<queue name=\"queueE\">");
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>"); out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
out.println("</queue>"); out.println("</queue>");
//Make queue F a parent queue without configured leaf queues using the 'type' attribute
out.println("<queue name=\"queueF\" type=\"parent\" >");
out.println("</queue>");
//Create hierarchical queues G,H
out.println("<queue name=\"queueG\">");
out.println(" <queue name=\"queueH\">");
out.println(" </queue>");
out.println("</queue>");
// Set default limit of apps per queue to 15 // Set default limit of apps per queue to 15
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>"); out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
// Set default limit of apps per user to 5 // Set default limit of apps per user to 5
@ -194,7 +214,7 @@ public void testAllocationFileParsing() throws Exception {
allocLoader.reloadAllocations(); allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf; AllocationConfiguration queueConf = confHolder.allocConf;
assertEquals(5, queueConf.getQueueNames().size()); assertEquals(6, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size());
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
@ -250,6 +270,14 @@ public void testAllocationFileParsing() throws Exception {
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE")); assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout()); assertEquals(300000, queueConf.getFairSharePreemptionTimeout());
assertTrue(queueConf.getConfiguredQueues()
.get(FSQueueType.PARENT)
.contains("root.queueF"));
assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.PARENT)
.contains("root.queueG"));
assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.LEAF)
.contains("root.queueG.queueH"));
// Verify existing queues have default scheduling policy // Verify existing queues have default scheduling policy
assertEquals(DominantResourceFairnessPolicy.NAME, assertEquals(DominantResourceFairnessPolicy.NAME,
queueConf.getSchedulingPolicy("root").getName()); queueConf.getSchedulingPolicy("root").getName());
@ -315,7 +343,7 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
allocLoader.reloadAllocations(); allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf; AllocationConfiguration queueConf = confHolder.allocConf;
assertEquals(5, queueConf.getQueueNames().size()); assertEquals(5, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size());
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),

View File

@ -34,6 +34,7 @@
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -43,7 +44,6 @@
import javax.xml.parsers.ParserConfigurationException; import javax.xml.parsers.ParserConfigurationException;
import org.junit.Assert; import org.junit.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -70,6 +70,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@ -737,8 +738,11 @@ public void testQueuePlacementWithPolicy() throws Exception {
rules.add(new QueuePlacementRule.Default().initialize(true, null)); rules.add(new QueuePlacementRule.Default().initialize(true, null));
Set<String> queues = Sets.newHashSet("root.user1", "root.user3group", Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
"root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2"); "root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
Map<FSQueueType, Set<String>> configuredQueues = new HashMap<FSQueueType, Set<String>>();
configuredQueues.put(FSQueueType.LEAF, queues);
configuredQueues.put(FSQueueType.PARENT, new HashSet<String>());
scheduler.getAllocationConfiguration().placementPolicy = scheduler.getAllocationConfiguration().placementPolicy =
new QueuePlacementPolicy(rules, queues, conf); new QueuePlacementPolicy(rules, configuredQueues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1"); appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName()); assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user1"); appId = createSchedulingRequest(1024, "default", "user1");
@ -758,7 +762,7 @@ public void testQueuePlacementWithPolicy() throws Exception {
rules.add(new QueuePlacementRule.Specified().initialize(true, null)); rules.add(new QueuePlacementRule.Specified().initialize(true, null));
rules.add(new QueuePlacementRule.Default().initialize(true, null)); rules.add(new QueuePlacementRule.Default().initialize(true, null));
scheduler.getAllocationConfiguration().placementPolicy = scheduler.getAllocationConfiguration().placementPolicy =
new QueuePlacementPolicy(rules, queues, conf); new QueuePlacementPolicy(rules, configuredQueues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1"); appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName()); assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "somequeue", "otheruser"); appId = createSchedulingRequest(1024, "somequeue", "otheruser");
@ -810,6 +814,88 @@ else if (p.getName().equals("root.queueB")) {
} }
} }
@Test
public void testNestedUserQueue() throws IOException {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"user1group\" type=\"parent\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queuePlacementPolicy>");
out.println("<rule name=\"specified\" create=\"false\" />");
out.println("<rule name=\"nestedUserQueue\">");
out.println(" <rule name=\"primaryGroup\" create=\"false\" />");
out.println("</rule>");
out.println("<rule name=\"default\" />");
out.println("</queuePlacementPolicy>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, resourceManager.getRMContext());
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default",
"user1");
assertEquals("root.user1group.user1", user1Leaf.getName());
}
@Test
public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"parentq\" type=\"parent\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queuePlacementPolicy>");
out.println("<rule name=\"nestedUserQueue\">");
out.println(" <rule name=\"specified\" create=\"false\" />");
out.println("</rule>");
out.println("<rule name=\"default\" />");
out.println("</queuePlacementPolicy>");
out.println("</allocations>");
out.close();
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
scheduler.reinitialize(conf, resourceManager.getRMContext());
int capacity = 16 * 1024;
// create node with 16 G
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity),
1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// user1,user2 submit their apps to parentq and create user queues
scheduler.assignToQueue(rmApp1, "root.parentq", "user1");
scheduler.assignToQueue(rmApp2, "root.parentq", "user2");
scheduler.update();
Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
.getLeafQueues();
for (FSLeafQueue leaf : leafQueues) {
if (leaf.getName().equals("root.parentq.user1")
|| leaf.getName().equals("root.parentq.user2")) {
// assert that the fair share is 1/4th node1's capacity
assertEquals(capacity / 4, leaf.getFairShare().getMemory());
// assert weights are equal for both the user queues
assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0);
}
}
}
/** /**
* Make allocation requests and ensure they are reflected in queue demand. * Make allocation requests and ensure they are reflected in queue demand.
*/ */

View File

@ -17,8 +17,7 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.*;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import java.util.HashSet; import java.util.HashSet;
@ -57,45 +56,77 @@ public boolean isEmpty(FSQueue queue) {
@Test @Test
public void testReloadTurnsLeafQueueIntoParent() throws Exception { public void testReloadTurnsLeafQueueIntoParent() throws Exception {
updateConfiguredQueues(queueManager, "queue1"); updateConfiguredLeafQueues(queueManager, "queue1");
// When no apps are running in the leaf queue, should be fine turning it // When no apps are running in the leaf queue, should be fine turning it
// into a parent. // into a parent.
updateConfiguredQueues(queueManager, "queue1.queue2"); updateConfiguredLeafQueues(queueManager, "queue1.queue2");
assertNull(queueManager.getLeafQueue("queue1", false)); assertNull(queueManager.getLeafQueue("queue1", false));
assertNotNull(queueManager.getLeafQueue("queue1.queue2", false)); assertNotNull(queueManager.getLeafQueue("queue1.queue2", false));
// When leaf queues are empty, should be ok deleting them and // When leaf queues are empty, should be ok deleting them and
// turning parent into a leaf. // turning parent into a leaf.
updateConfiguredQueues(queueManager, "queue1"); updateConfiguredLeafQueues(queueManager, "queue1");
assertNull(queueManager.getLeafQueue("queue1.queue2", false)); assertNull(queueManager.getLeafQueue("queue1.queue2", false));
assertNotNull(queueManager.getLeafQueue("queue1", false)); assertNotNull(queueManager.getLeafQueue("queue1", false));
// When apps exist in leaf queue, we shouldn't be able to create // When apps exist in leaf queue, we shouldn't be able to create
// children under it, but things should work otherwise. // children under it, but things should work otherwise.
notEmptyQueues.add(queueManager.getLeafQueue("queue1", false)); notEmptyQueues.add(queueManager.getLeafQueue("queue1", false));
updateConfiguredQueues(queueManager, "queue1.queue2"); updateConfiguredLeafQueues(queueManager, "queue1.queue2");
assertNull(queueManager.getLeafQueue("queue1.queue2", false)); assertNull(queueManager.getLeafQueue("queue1.queue2", false));
assertNotNull(queueManager.getLeafQueue("queue1", false)); assertNotNull(queueManager.getLeafQueue("queue1", false));
// When apps exist in leaf queues under a parent queue, shouldn't be // When apps exist in leaf queues under a parent queue, shouldn't be
// able to turn it into a leaf queue, but things should work otherwise. // able to turn it into a leaf queue, but things should work otherwise.
notEmptyQueues.clear(); notEmptyQueues.clear();
updateConfiguredQueues(queueManager, "queue1.queue2"); updateConfiguredLeafQueues(queueManager, "queue1.queue2");
notEmptyQueues.add(queueManager.getQueue("root.queue1")); notEmptyQueues.add(queueManager.getQueue("root.queue1"));
updateConfiguredQueues(queueManager, "queue1"); updateConfiguredLeafQueues(queueManager, "queue1");
assertNotNull(queueManager.getLeafQueue("queue1.queue2", false)); assertNotNull(queueManager.getLeafQueue("queue1.queue2", false));
assertNull(queueManager.getLeafQueue("queue1", false)); assertNull(queueManager.getLeafQueue("queue1", false));
// Should never to be able to create a queue under the default queue // Should never to be able to create a queue under the default queue
updateConfiguredQueues(queueManager, "default.queue3"); updateConfiguredLeafQueues(queueManager, "default.queue3");
assertNull(queueManager.getLeafQueue("default.queue3", false)); assertNull(queueManager.getLeafQueue("default.queue3", false));
assertNotNull(queueManager.getLeafQueue("default", false)); assertNotNull(queueManager.getLeafQueue("default", false));
} }
private void updateConfiguredQueues(QueueManager queueMgr, String... confQueues) { @Test
public void testReloadTurnsLeafToParentWithNoLeaf() {
AllocationConfiguration allocConf = new AllocationConfiguration(conf); AllocationConfiguration allocConf = new AllocationConfiguration(conf);
allocConf.queueNames = Sets.newHashSet(confQueues); // Create a leaf queue1
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.queue1");
queueManager.updateAllocationConfiguration(allocConf);
assertNotNull(queueManager.getLeafQueue("root.queue1", false));
// Lets say later on admin makes queue1 a parent queue by
// specifying "type=parent" in the alloc xml and lets say apps running in
// queue1
notEmptyQueues.add(queueManager.getLeafQueue("root.queue1", false));
allocConf = new AllocationConfiguration(conf);
allocConf.configuredQueues.get(FSQueueType.PARENT)
.add("root.queue1");
// When allocs are reloaded queue1 shouldn't be converter to parent
queueManager.updateAllocationConfiguration(allocConf);
assertNotNull(queueManager.getLeafQueue("root.queue1", false));
assertNull(queueManager.getParentQueue("root.queue1", false));
// Now lets assume apps completed and there are no apps in queue1
notEmptyQueues.clear();
// We should see queue1 transform from leaf queue to parent queue.
queueManager.updateAllocationConfiguration(allocConf);
assertNull(queueManager.getLeafQueue("root.queue1", false));
assertNotNull(queueManager.getParentQueue("root.queue1", false));
// this parent should not have any children
assertTrue(queueManager.getParentQueue("root.queue1", false)
.getChildQueues().isEmpty());
}
private void updateConfiguredLeafQueues(QueueManager queueMgr, String... confLeafQueues) {
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues));
queueMgr.updateAllocationConfiguration(allocConf); queueMgr.updateAllocationConfiguration(allocConf);
} }
} }

View File

@ -17,8 +17,11 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.*;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set; import java.util.Set;
import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilder;
@ -28,16 +31,15 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider; import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.w3c.dom.Document; import org.w3c.dom.Document;
import org.w3c.dom.Element; import org.w3c.dom.Element;
import com.google.common.collect.Sets;
public class TestQueuePlacementPolicy { public class TestQueuePlacementPolicy {
private final static Configuration conf = new Configuration(); private final static Configuration conf = new Configuration();
private final static Set<String> configuredQueues = Sets.newHashSet("root.someuser"); private Map<FSQueueType, Set<String>> configuredQueues;
@BeforeClass @BeforeClass
public static void setup() { public static void setup() {
@ -45,6 +47,14 @@ public static void setup() {
SimpleGroupsMapping.class, GroupMappingServiceProvider.class); SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
} }
@Before
public void initTest() {
configuredQueues = new HashMap<FSQueueType, Set<String>>();
for (FSQueueType type : FSQueueType.values()) {
configuredQueues.put(type, new HashSet<String>());
}
}
@Test @Test
public void testSpecifiedUserPolicy() throws Exception { public void testSpecifiedUserPolicy() throws Exception {
StringBuffer sb = new StringBuffer(); StringBuffer sb = new StringBuffer();
@ -53,9 +63,12 @@ public void testSpecifiedUserPolicy() throws Exception {
sb.append(" <rule name='user' />"); sb.append(" <rule name='user' />");
sb.append("</queuePlacementPolicy>"); sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString()); QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.specifiedq",policy.assignAppToQueue("specifiedq", "someuser")); assertEquals("root.specifiedq",
assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser")); policy.assignAppToQueue("specifiedq", "someuser"));
assertEquals("root.otheruser", policy.assignAppToQueue("default", "otheruser")); assertEquals("root.someuser",
policy.assignAppToQueue("default", "someuser"));
assertEquals("root.otheruser",
policy.assignAppToQueue("default", "otheruser"));
} }
@Test @Test
@ -66,6 +79,8 @@ public void testNoCreate() throws Exception {
sb.append(" <rule name='user' create=\"false\" />"); sb.append(" <rule name='user' create=\"false\" />");
sb.append(" <rule name='default' />"); sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>"); sb.append("</queuePlacementPolicy>");
configuredQueues.get(FSQueueType.LEAF).add("root.someuser");
QueuePlacementPolicy policy = parse(sb.toString()); QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser")); assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser")); assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
@ -81,7 +96,8 @@ public void testSpecifiedThenReject() throws Exception {
sb.append(" <rule name='reject' />"); sb.append(" <rule name='reject' />");
sb.append("</queuePlacementPolicy>"); sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString()); QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser")); assertEquals("root.specifiedq",
policy.assignAppToQueue("specifiedq", "someuser"));
assertEquals(null, policy.assignAppToQueue("default", "someuser")); assertEquals(null, policy.assignAppToQueue("default", "someuser"));
} }
@ -117,10 +133,188 @@ public void testTerminals() throws Exception {
parse(sb.toString()); parse(sb.toString());
} }
@Test
public void testNestedUserQueueParsingErrors() {
// No nested rule specified in hierarchical user queue
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='nestedUserQueue'/>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
assertIfExceptionThrown(sb);
// Specified nested rule is not a QueuePlacementRule
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='unknownRule'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
assertIfExceptionThrown(sb);
}
private void assertIfExceptionThrown(StringBuffer sb) {
Throwable th = null;
try {
parse(sb.toString());
} catch (Exception e) {
th = e;
}
assertTrue(th instanceof AllocationConfigurationException);
}
@Test
public void testNestedUserQueueParsing() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
Throwable th = null;
try {
parse(sb.toString());
} catch (Exception e) {
th = e;
}
assertNull(th);
}
@Test
public void testNestedUserQueuePrimaryGroup() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='specified' create='false' />");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
// User queue would be created under primary group queue
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.user1group.user1",
policy.assignAppToQueue("root.default", "user1"));
// Other rules above and below hierarchical user queue rule should work as
// usual
configuredQueues.get(FSQueueType.LEAF).add("root.specifiedq");
// test if specified rule(above nestedUserQueue rule) works ok
assertEquals("root.specifiedq",
policy.assignAppToQueue("root.specifiedq", "user2"));
// test if default rule(below nestedUserQueue rule) works
configuredQueues.get(FSQueueType.LEAF).add("root.user3group");
assertEquals("root.default",
policy.assignAppToQueue("root.default", "user3"));
}
@Test
public void testNestedUserQueuePrimaryGroupNoCreate() throws Exception {
// Primary group rule has create='false'
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='primaryGroup' create='false'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
// Should return root.default since primary group 'root.user1group' is not
// configured
assertEquals("root.default",
policy.assignAppToQueue("root.default", "user1"));
// Let's configure primary group and check if user queue is created
configuredQueues.get(FSQueueType.PARENT).add("root.user1group");
policy = parse(sb.toString());
assertEquals("root.user1group.user1",
policy.assignAppToQueue("root.default", "user1"));
// Both Primary group and nestedUserQueue rule has create='false'
sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='nestedUserQueue' create='false'>");
sb.append(" <rule name='primaryGroup' create='false'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
// Should return root.default since primary group and user queue for user 2
// are not configured.
assertEquals("root.default",
policy.assignAppToQueue("root.default", "user2"));
// Now configure both primary group and the user queue for user2
configuredQueues.get(FSQueueType.PARENT).add("root.user2group");
configuredQueues.get(FSQueueType.LEAF).add("root.user2group.user2");
policy = parse(sb.toString());
assertEquals("root.user2group.user2",
policy.assignAppToQueue("root.default", "user2"));
}
@Test
public void testNestedUserQueueSecondaryGroup() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='secondaryGroupExistingQueue'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
// Should return root.default since secondary groups are not configured
assertEquals("root.default",
policy.assignAppToQueue("root.default", "user1"));
// configure secondary group for user1
configuredQueues.get(FSQueueType.PARENT).add("root.user1subgroup1");
policy = parse(sb.toString());
// user queue created should be created under secondary group
assertEquals("root.user1subgroup1.user1",
policy.assignAppToQueue("root.default", "user1"));
}
@Test
public void testNestedUserQueueSpecificRule() throws Exception {
// This test covers the use case where users can specify different parent
// queues and want user queues under those.
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <rule name='nestedUserQueue'>");
sb.append(" <rule name='specified' create='false'/>");
sb.append(" </rule>");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
// Let's create couple of parent queues
configuredQueues.get(FSQueueType.PARENT).add("root.parent1");
configuredQueues.get(FSQueueType.PARENT).add("root.parent2");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.parent1.user1",
policy.assignAppToQueue("root.parent1", "user1"));
assertEquals("root.parent2.user2",
policy.assignAppToQueue("root.parent2", "user2"));
}
private QueuePlacementPolicy parse(String str) throws Exception { private QueuePlacementPolicy parse(String str) throws Exception {
// Read and parse the allocations file. // Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
DocumentBuilderFactory.newInstance(); .newInstance();
docBuilderFactory.setIgnoringComments(true); docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = builder.parse(IOUtils.toInputStream(str)); Document doc = builder.parse(IOUtils.toInputStream(str));

View File

@ -206,8 +206,10 @@ Allocation file format
The allocation file must be in XML format. The format contains five types of The allocation file must be in XML format. The format contains five types of
elements: elements:
* <<Queue elements>>, which represent queues. Each may contain the following * <<Queue elements>>, which represent queues. Queue elements can take an optional
properties: attribute type,which when set to parent makes it a parent queue. This is useful
when we want to create a parent queue without configuring any leaf queues.
Each queue element may contain the following properties:
* minResources: minimum resources the queue is entitled to, in the form * minResources: minimum resources the queue is entitled to, in the form
"X mb, Y vcores". For the single-resource fairness policy, the vcores "X mb, Y vcores". For the single-resource fairness policy, the vcores
@ -299,6 +301,15 @@ Allocation file format
that matches a secondary group of the user who submitted it. The first that matches a secondary group of the user who submitted it. The first
secondary group that matches a configured queue will be selected. secondary group that matches a configured queue will be selected.
* nestedUserQueue : the app is placed into a queue with the name of the user
under the queue suggested by the nested rule. This is similar to user
rule,the difference being in nestedUserQueue rule,user queues can be created
under any parent queue, while user rule creates user queues only under root queue.
Note that nestedUserQueue rule would be applied only if the nested rule returns a
parent queue.One can configure a parent queue either by setting type attribute of queue
to parent or by configuring at least one leaf under that queue which makes it a parent.
See example allocation for a sample use case.
* default: the app is placed into the queue named "default". * default: the app is placed into the queue named "default".
* reject: the app is rejected. * reject: the app is rejected.
@ -320,6 +331,12 @@ Allocation file format
</queue> </queue>
</queue> </queue>
<!—- Queue secondary_group_queue is a parent queue and may have
user queues under it -—>
<queue name=“secondary_group_queue” type=“parent”>
<weight>3.0</weight>
</queue>
<user name="sample_user"> <user name="sample_user">
<maxRunningApps>30</maxRunningApps> <maxRunningApps>30</maxRunningApps>
</user> </user>
@ -328,6 +345,9 @@ Allocation file format
<queuePlacementPolicy> <queuePlacementPolicy>
<rule name="specified" /> <rule name="specified" />
<rule name="primaryGroup" create="false" /> <rule name="primaryGroup" create="false" />
<rule name=“nestedUserQueue”>
<rule name=“secondaryGroupExistingQueue” create=“false” />
</rule>
<rule name="default" /> <rule name="default" />
</queuePlacementPolicy> </queuePlacementPolicy>
</allocations> </allocations>