YARN-1864. Fair Scheduler Dynamic Hierarchical User Queues (Ashwin Shankar via Sandy Ryza)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1593190 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1ff694081f
commit
cfc97a4e88
|
@ -23,6 +23,9 @@ Release 2.5.0 - UNRELEASED
|
|||
|
||||
YARN-1757. NM Recovery. Auxiliary service support. (Jason Lowe via kasha)
|
||||
|
||||
YARN-1864. Fair Scheduler Dynamic Hierarchical User Queues (Ashwin Shankar
|
||||
via Sandy Ryza)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
|
||||
|
|
|
@ -77,19 +77,22 @@ public class AllocationConfiguration {
|
|||
@VisibleForTesting
|
||||
QueuePlacementPolicy placementPolicy;
|
||||
|
||||
//Configured queues in the alloc xml
|
||||
@VisibleForTesting
|
||||
Set<String> queueNames;
|
||||
Map<FSQueueType, Set<String>> configuredQueues;
|
||||
|
||||
public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
||||
Map<String, Resource> maxQueueResources,
|
||||
public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
||||
Map<String, Resource> maxQueueResources,
|
||||
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
||||
Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
|
||||
int queueMaxAppsDefault, Map<String, SchedulingPolicy> schedulingPolicies,
|
||||
int queueMaxAppsDefault,
|
||||
Map<String, SchedulingPolicy> schedulingPolicies,
|
||||
SchedulingPolicy defaultSchedulingPolicy,
|
||||
Map<String, Long> minSharePreemptionTimeouts,
|
||||
Map<String, Long> minSharePreemptionTimeouts,
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
||||
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout,
|
||||
QueuePlacementPolicy placementPolicy, Set<String> queueNames) {
|
||||
QueuePlacementPolicy placementPolicy,
|
||||
Map<FSQueueType, Set<String>> configuredQueues) {
|
||||
this.minQueueResources = minQueueResources;
|
||||
this.maxQueueResources = maxQueueResources;
|
||||
this.queueMaxApps = queueMaxApps;
|
||||
|
@ -104,7 +107,7 @@ public class AllocationConfiguration {
|
|||
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
|
||||
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
|
||||
this.placementPolicy = placementPolicy;
|
||||
this.queueNames = queueNames;
|
||||
this.configuredQueues = configuredQueues;
|
||||
}
|
||||
|
||||
public AllocationConfiguration(Configuration conf) {
|
||||
|
@ -121,9 +124,12 @@ public class AllocationConfiguration {
|
|||
fairSharePreemptionTimeout = Long.MAX_VALUE;
|
||||
schedulingPolicies = new HashMap<String, SchedulingPolicy>();
|
||||
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
|
||||
configuredQueues = new HashMap<FSQueueType, Set<String>>();
|
||||
for (FSQueueType queueType : FSQueueType.values()) {
|
||||
configuredQueues.put(queueType, new HashSet<String>());
|
||||
}
|
||||
placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
|
||||
new HashSet<String>());
|
||||
queueNames = new HashSet<String>();
|
||||
configuredQueues);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -221,8 +227,8 @@ public class AllocationConfiguration {
|
|||
return defaultSchedulingPolicy;
|
||||
}
|
||||
|
||||
public Set<String> getQueueNames() {
|
||||
return queueNames;
|
||||
public Map<FSQueueType, Set<String>> getConfiguredQueues() {
|
||||
return configuredQueues;
|
||||
}
|
||||
|
||||
public QueuePlacementPolicy getPlacementPolicy() {
|
||||
|
|
|
@ -214,8 +214,15 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
QueuePlacementPolicy newPlacementPolicy = null;
|
||||
|
||||
// Remember all queue names so we can display them on web UI, etc.
|
||||
Set<String> queueNamesInAllocFile = new HashSet<String>();
|
||||
|
||||
// configuredQueues is segregated based on whether it is a leaf queue
|
||||
// or a parent queue. This information is used for creating queues
|
||||
// and also for making queue placement decisions(QueuePlacementRule.java).
|
||||
Map<FSQueueType, Set<String>> configuredQueues =
|
||||
new HashMap<FSQueueType, Set<String>>();
|
||||
for (FSQueueType queueType : FSQueueType.values()) {
|
||||
configuredQueues.put(queueType, new HashSet<String>());
|
||||
}
|
||||
|
||||
// Read and parse the allocations file.
|
||||
DocumentBuilderFactory docBuilderFactory =
|
||||
DocumentBuilderFactory.newInstance();
|
||||
|
@ -289,26 +296,27 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
}
|
||||
parent = null;
|
||||
}
|
||||
loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
|
||||
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
|
||||
queueAcls, queueNamesInAllocFile);
|
||||
loadQueue(parent, element, minQueueResources, maxQueueResources,
|
||||
queueMaxApps, userMaxApps, queueWeights, queuePolicies,
|
||||
minSharePreemptionTimeouts, queueAcls,
|
||||
configuredQueues);
|
||||
}
|
||||
|
||||
// Load placement policy and pass it configured queues
|
||||
Configuration conf = getConfig();
|
||||
if (placementPolicyElement != null) {
|
||||
newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
|
||||
queueNamesInAllocFile, conf);
|
||||
configuredQueues, conf);
|
||||
} else {
|
||||
newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
|
||||
queueNamesInAllocFile);
|
||||
configuredQueues);
|
||||
}
|
||||
|
||||
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
|
||||
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
|
||||
queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
|
||||
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
|
||||
newPlacementPolicy, queueNamesInAllocFile);
|
||||
newPlacementPolicy, configuredQueues);
|
||||
|
||||
lastSuccessfulReload = clock.getTime();
|
||||
lastReloadAttemptFailed = false;
|
||||
|
@ -324,7 +332,8 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
|
||||
Map<String, SchedulingPolicy> queuePolicies,
|
||||
Map<String, Long> minSharePreemptionTimeouts,
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls, Set<String> queueNamesInAllocFile)
|
||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
||||
Map<FSQueueType, Set<String>> configuredQueues)
|
||||
throws AllocationConfigurationException {
|
||||
String queueName = element.getAttribute("name");
|
||||
if (parentName != null) {
|
||||
|
@ -375,13 +384,19 @@ public class AllocationFileLoaderService extends AbstractService {
|
|||
"pool".equals(field.getTagName())) {
|
||||
loadQueue(queueName, field, minQueueResources, maxQueueResources,
|
||||
queueMaxApps, userMaxApps, queueWeights, queuePolicies,
|
||||
minSharePreemptionTimeouts,
|
||||
queueAcls, queueNamesInAllocFile);
|
||||
minSharePreemptionTimeouts, queueAcls, configuredQueues);
|
||||
configuredQueues.get(FSQueueType.PARENT).add(queueName);
|
||||
isLeaf = false;
|
||||
}
|
||||
}
|
||||
if (isLeaf) {
|
||||
queueNamesInAllocFile.add(queueName);
|
||||
// if a leaf in the alloc file is marked as type='parent'
|
||||
// then store it under 'parent'
|
||||
if ("parent".equals(element.getAttribute("type"))) {
|
||||
configuredQueues.get(FSQueueType.PARENT).add(queueName);
|
||||
} else {
|
||||
configuredQueues.get(FSQueueType.LEAF).add(queueName);
|
||||
}
|
||||
}
|
||||
queueAcls.put(queueName, acls);
|
||||
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
|
||||
|
|
|
@ -74,7 +74,7 @@ public class QueueManager {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get a queue by name, creating it if the create param is true and is necessary.
|
||||
* Get a leaf queue by name, creating it if the create param is true and is necessary.
|
||||
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
|
||||
* parent queue, or one of the parents in its name is already a leaf queue,
|
||||
* null is returned.
|
||||
|
@ -85,31 +85,53 @@ public class QueueManager {
|
|||
* could be referred to as just "parent1.queue2".
|
||||
*/
|
||||
public FSLeafQueue getLeafQueue(String name, boolean create) {
|
||||
FSQueue queue = getQueue(name, create, FSQueueType.LEAF);
|
||||
if (queue instanceof FSParentQueue) {
|
||||
return null;
|
||||
}
|
||||
return (FSLeafQueue) queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a parent queue by name, creating it if the create param is true and is necessary.
|
||||
* If the queue is not or can not be a parent queue, i.e. it already exists as a
|
||||
* leaf queue, or one of the parents in its name is already a leaf queue,
|
||||
* null is returned.
|
||||
*
|
||||
* The root part of the name is optional, so a queue underneath the root
|
||||
* named "queue1" could be referred to as just "queue1", and a queue named
|
||||
* "queue2" underneath a parent named "parent1" that is underneath the root
|
||||
* could be referred to as just "parent1.queue2".
|
||||
*/
|
||||
public FSParentQueue getParentQueue(String name, boolean create) {
|
||||
FSQueue queue = getQueue(name, create, FSQueueType.PARENT);
|
||||
if (queue instanceof FSLeafQueue) {
|
||||
return null;
|
||||
}
|
||||
return (FSParentQueue) queue;
|
||||
}
|
||||
|
||||
private FSQueue getQueue(String name, boolean create, FSQueueType queueType) {
|
||||
name = ensureRootPrefix(name);
|
||||
synchronized (queues) {
|
||||
FSQueue queue = queues.get(name);
|
||||
if (queue == null && create) {
|
||||
FSLeafQueue leafQueue = createLeafQueue(name);
|
||||
if (leafQueue == null) {
|
||||
return null;
|
||||
}
|
||||
queue = leafQueue;
|
||||
} else if (queue instanceof FSParentQueue) {
|
||||
return null;
|
||||
// if the queue doesn't exist,create it and return
|
||||
queue = createQueue(name, queueType);
|
||||
}
|
||||
return (FSLeafQueue)queue;
|
||||
return queue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a leaf queue and places it in the tree. Creates any
|
||||
* parents that don't already exist.
|
||||
* Creates a leaf or parent queue based on what is specified in 'queueType'
|
||||
* and places it in the tree. Creates any parents that don't already exist.
|
||||
*
|
||||
* @return
|
||||
* the created queue, if successful. null if not allowed (one of the parent
|
||||
* queues in the queue name is already a leaf queue)
|
||||
*/
|
||||
private FSLeafQueue createLeafQueue(String name) {
|
||||
private FSQueue createQueue(String name, FSQueueType queueType) {
|
||||
List<String> newQueueNames = new ArrayList<String>();
|
||||
newQueueNames.add(name);
|
||||
int sepIndex = name.length();
|
||||
|
@ -143,8 +165,7 @@ public class QueueManager {
|
|||
FSLeafQueue leafQueue = null;
|
||||
for (int i = newQueueNames.size()-1; i >= 0; i--) {
|
||||
String queueName = newQueueNames.get(i);
|
||||
if (i == 0) {
|
||||
// First name added was the leaf queue
|
||||
if (i == 0 && queueType != FSQueueType.PARENT) {
|
||||
leafQueue = new FSLeafQueue(name, scheduler, parent);
|
||||
try {
|
||||
leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
|
||||
|
@ -155,6 +176,7 @@ public class QueueManager {
|
|||
parent.addChildQueue(leafQueue);
|
||||
queues.put(leafQueue.getName(), leafQueue);
|
||||
leafQueues.add(leafQueue);
|
||||
return leafQueue;
|
||||
} else {
|
||||
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
|
||||
try {
|
||||
|
@ -169,53 +191,64 @@ public class QueueManager {
|
|||
}
|
||||
}
|
||||
|
||||
return leafQueue;
|
||||
return parent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Make way for the given leaf queue if possible, by removing incompatible
|
||||
* Make way for the given queue if possible, by removing incompatible
|
||||
* queues with no apps in them. Incompatibility could be due to
|
||||
* (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in
|
||||
* the ancestry of leafToCreate.
|
||||
* (1) queueToCreate being currently a parent but needs to change to leaf
|
||||
* (2) queueToCreate being currently a leaf but needs to change to parent
|
||||
* (3) an existing leaf queue in the ancestry of queueToCreate.
|
||||
*
|
||||
* We will never remove the root queue or the default queue in this way.
|
||||
*
|
||||
* @return true if we can create leafToCreate or it already exists.
|
||||
* @return true if we can create queueToCreate or it already exists.
|
||||
*/
|
||||
private boolean removeEmptyIncompatibleQueues(String leafToCreate) {
|
||||
leafToCreate = ensureRootPrefix(leafToCreate);
|
||||
private boolean removeEmptyIncompatibleQueues(String queueToCreate,
|
||||
FSQueueType queueType) {
|
||||
queueToCreate = ensureRootPrefix(queueToCreate);
|
||||
|
||||
// Ensure leafToCreate is not root and doesn't have the default queue in its
|
||||
// Ensure queueToCreate is not root and doesn't have the default queue in its
|
||||
// ancestry.
|
||||
if (leafToCreate.equals(ROOT_QUEUE) ||
|
||||
leafToCreate.startsWith(
|
||||
if (queueToCreate.equals(ROOT_QUEUE) ||
|
||||
queueToCreate.startsWith(
|
||||
ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
|
||||
return false;
|
||||
}
|
||||
|
||||
FSQueue queue = queues.get(leafToCreate);
|
||||
FSQueue queue = queues.get(queueToCreate);
|
||||
// Queue exists already.
|
||||
if (queue != null) {
|
||||
if (queue instanceof FSLeafQueue) {
|
||||
// If it's an already existing leaf, we're ok.
|
||||
return true;
|
||||
if (queueType == FSQueueType.LEAF) {
|
||||
// if queue is already a leaf then return true
|
||||
return true;
|
||||
}
|
||||
// remove incompatibility since queue is a leaf currently
|
||||
// needs to change to a parent.
|
||||
return removeQueueIfEmpty(queue);
|
||||
} else {
|
||||
// If it's an existing parent queue, remove it if it's empty.
|
||||
if (queueType == FSQueueType.PARENT) {
|
||||
return true;
|
||||
}
|
||||
// If it's an existing parent queue and needs to change to leaf,
|
||||
// remove it if it's empty.
|
||||
return removeQueueIfEmpty(queue);
|
||||
}
|
||||
}
|
||||
|
||||
// Queue doesn't exist already. Check if the new queue would be created
|
||||
// under an existing leaf queue. If so, try removing that leaf queue.
|
||||
int sepIndex = leafToCreate.length();
|
||||
sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
|
||||
int sepIndex = queueToCreate.length();
|
||||
sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
|
||||
while (sepIndex != -1) {
|
||||
String prefixString = leafToCreate.substring(0, sepIndex);
|
||||
String prefixString = queueToCreate.substring(0, sepIndex);
|
||||
FSQueue prefixQueue = queues.get(prefixString);
|
||||
if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
|
||||
return removeQueueIfEmpty(prefixQueue);
|
||||
}
|
||||
sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
|
||||
sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
@ -312,12 +345,21 @@ public class QueueManager {
|
|||
}
|
||||
|
||||
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
|
||||
// Make sure all queues exist
|
||||
for (String name : queueConf.getQueueNames()) {
|
||||
if (removeEmptyIncompatibleQueues(name)) {
|
||||
// Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
|
||||
for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) {
|
||||
if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
|
||||
getLeafQueue(name, true);
|
||||
}
|
||||
}
|
||||
|
||||
// At this point all leaves and 'parents with at least one child' would have been created.
|
||||
// Now create parents with no configured leaf.
|
||||
for (String name : queueConf.getConfiguredQueues().get(
|
||||
FSQueueType.PARENT)) {
|
||||
if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
|
||||
getParentQueue(name, true);
|
||||
}
|
||||
}
|
||||
|
||||
for (FSQueue queue : queues.values()) {
|
||||
// Update queue metrics
|
||||
|
|
|
@ -42,17 +42,19 @@ public class QueuePlacementPolicy {
|
|||
map.put("secondaryGroupExistingQueue",
|
||||
QueuePlacementRule.SecondaryGroupExistingQueue.class);
|
||||
map.put("specified", QueuePlacementRule.Specified.class);
|
||||
map.put("nestedUserQueue",
|
||||
QueuePlacementRule.NestedUserQueue.class);
|
||||
map.put("default", QueuePlacementRule.Default.class);
|
||||
map.put("reject", QueuePlacementRule.Reject.class);
|
||||
ruleClasses = Collections.unmodifiableMap(map);
|
||||
}
|
||||
|
||||
private final List<QueuePlacementRule> rules;
|
||||
private final Set<String> configuredQueues;
|
||||
private final Map<FSQueueType, Set<String>> configuredQueues;
|
||||
private final Groups groups;
|
||||
|
||||
public QueuePlacementPolicy(List<QueuePlacementRule> rules,
|
||||
Set<String> configuredQueues, Configuration conf)
|
||||
Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
|
||||
throws AllocationConfigurationException {
|
||||
for (int i = 0; i < rules.size()-1; i++) {
|
||||
if (rules.get(i).isTerminal()) {
|
||||
|
@ -72,40 +74,53 @@ public class QueuePlacementPolicy {
|
|||
/**
|
||||
* Builds a QueuePlacementPolicy from an xml element.
|
||||
*/
|
||||
public static QueuePlacementPolicy fromXml(Element el, Set<String> configuredQueues,
|
||||
Configuration conf) throws AllocationConfigurationException {
|
||||
public static QueuePlacementPolicy fromXml(Element el,
|
||||
Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
|
||||
throws AllocationConfigurationException {
|
||||
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
|
||||
NodeList elements = el.getChildNodes();
|
||||
for (int i = 0; i < elements.getLength(); i++) {
|
||||
Node node = elements.item(i);
|
||||
if (node instanceof Element) {
|
||||
Element element = (Element)node;
|
||||
|
||||
String ruleName = element.getAttribute("name");
|
||||
if ("".equals(ruleName)) {
|
||||
throw new AllocationConfigurationException("No name provided for a " +
|
||||
"rule element");
|
||||
}
|
||||
|
||||
Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
|
||||
if (clazz == null) {
|
||||
throw new AllocationConfigurationException("No rule class found for "
|
||||
+ ruleName);
|
||||
}
|
||||
QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
|
||||
rule.initializeFromXml(element);
|
||||
QueuePlacementRule rule = createAndInitializeRule(node);
|
||||
rules.add(rule);
|
||||
}
|
||||
}
|
||||
return new QueuePlacementPolicy(rules, configuredQueues, conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create and initialize a rule given a xml node
|
||||
* @param node
|
||||
* @return QueuePlacementPolicy
|
||||
* @throws AllocationConfigurationException
|
||||
*/
|
||||
public static QueuePlacementRule createAndInitializeRule(Node node)
|
||||
throws AllocationConfigurationException {
|
||||
Element element = (Element) node;
|
||||
|
||||
String ruleName = element.getAttribute("name");
|
||||
if ("".equals(ruleName)) {
|
||||
throw new AllocationConfigurationException("No name provided for a "
|
||||
+ "rule element");
|
||||
}
|
||||
|
||||
Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
|
||||
if (clazz == null) {
|
||||
throw new AllocationConfigurationException("No rule class found for "
|
||||
+ ruleName);
|
||||
}
|
||||
QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
|
||||
rule.initializeFromXml(element);
|
||||
return rule;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a simple queue placement policy from the allow-undeclared-pools and
|
||||
* user-as-default-queue configuration options.
|
||||
*/
|
||||
public static QueuePlacementPolicy fromConfiguration(Configuration conf,
|
||||
Set<String> configuredQueues) {
|
||||
Map<FSQueueType, Set<String>> configuredQueues) {
|
||||
boolean create = conf.getBoolean(
|
||||
FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
|
||||
FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
|
||||
|
|
|
@ -18,16 +18,19 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.security.Groups;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.NamedNodeMap;
|
||||
import org.w3c.dom.Node;
|
||||
import org.w3c.dom.NodeList;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
public abstract class QueuePlacementRule {
|
||||
protected boolean create;
|
||||
|
@ -58,16 +61,20 @@ public abstract class QueuePlacementRule {
|
|||
* continue to the next rule, and null indicates that the app should be rejected.
|
||||
*/
|
||||
public String assignAppToQueue(String requestedQueue, String user,
|
||||
Groups groups, Collection<String> configuredQueues) throws IOException {
|
||||
String queue = getQueueForApp(requestedQueue, user, groups, configuredQueues);
|
||||
if (create || configuredQueues.contains(queue)) {
|
||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
|
||||
throws IOException {
|
||||
String queue = getQueueForApp(requestedQueue, user, groups,
|
||||
configuredQueues);
|
||||
if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue)
|
||||
|| configuredQueues.get(FSQueueType.PARENT).contains(queue)) {
|
||||
return queue;
|
||||
} else {
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
||||
public void initializeFromXml(Element el) {
|
||||
public void initializeFromXml(Element el)
|
||||
throws AllocationConfigurationException {
|
||||
boolean create = true;
|
||||
NamedNodeMap attributes = el.getAttributes();
|
||||
Map<String, String> args = new HashMap<String, String>();
|
||||
|
@ -104,15 +111,16 @@ public abstract class QueuePlacementRule {
|
|||
* continue to the next rule.
|
||||
*/
|
||||
protected abstract String getQueueForApp(String requestedQueue, String user,
|
||||
Groups groups, Collection<String> configuredQueues) throws IOException;
|
||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Places apps in queues by username of the submitter
|
||||
*/
|
||||
public static class User extends QueuePlacementRule {
|
||||
@Override
|
||||
protected String getQueueForApp(String requestedQueue,
|
||||
String user, Groups groups, Collection<String> configuredQueues) {
|
||||
protected String getQueueForApp(String requestedQueue, String user,
|
||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
|
||||
return "root." + user;
|
||||
}
|
||||
|
||||
|
@ -127,9 +135,9 @@ public abstract class QueuePlacementRule {
|
|||
*/
|
||||
public static class PrimaryGroup extends QueuePlacementRule {
|
||||
@Override
|
||||
protected String getQueueForApp(String requestedQueue,
|
||||
String user, Groups groups,
|
||||
Collection<String> configuredQueues) throws IOException {
|
||||
protected String getQueueForApp(String requestedQueue, String user,
|
||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
|
||||
throws IOException {
|
||||
return "root." + groups.getGroups(user).get(0);
|
||||
}
|
||||
|
||||
|
@ -147,12 +155,15 @@ public abstract class QueuePlacementRule {
|
|||
*/
|
||||
public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
|
||||
@Override
|
||||
protected String getQueueForApp(String requestedQueue,
|
||||
String user, Groups groups,
|
||||
Collection<String> configuredQueues) throws IOException {
|
||||
protected String getQueueForApp(String requestedQueue, String user,
|
||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
|
||||
throws IOException {
|
||||
List<String> groupNames = groups.getGroups(user);
|
||||
for (int i = 1; i < groupNames.size(); i++) {
|
||||
if (configuredQueues.contains("root." + groupNames.get(i))) {
|
||||
String group = groupNames.get(i);
|
||||
if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
|
||||
|| configuredQueues.get(FSQueueType.PARENT).contains(
|
||||
"root." + group)) {
|
||||
return "root." + groupNames.get(i);
|
||||
}
|
||||
}
|
||||
|
@ -166,13 +177,84 @@ public abstract class QueuePlacementRule {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Places apps in queues with name of the submitter under the queue
|
||||
* returned by the nested rule.
|
||||
*/
|
||||
public static class NestedUserQueue extends QueuePlacementRule {
|
||||
@VisibleForTesting
|
||||
QueuePlacementRule nestedRule;
|
||||
|
||||
/**
|
||||
* Parse xml and instantiate the nested rule
|
||||
*/
|
||||
@Override
|
||||
public void initializeFromXml(Element el)
|
||||
throws AllocationConfigurationException {
|
||||
NodeList elements = el.getChildNodes();
|
||||
|
||||
for (int i = 0; i < elements.getLength(); i++) {
|
||||
Node node = elements.item(i);
|
||||
if (node instanceof Element) {
|
||||
Element element = (Element) node;
|
||||
if ("rule".equals(element.getTagName())) {
|
||||
QueuePlacementRule rule = QueuePlacementPolicy
|
||||
.createAndInitializeRule(node);
|
||||
if (rule == null) {
|
||||
throw new AllocationConfigurationException(
|
||||
"Unable to create nested rule in nestedUserQueue rule");
|
||||
}
|
||||
this.nestedRule = rule;
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (this.nestedRule == null) {
|
||||
throw new AllocationConfigurationException(
|
||||
"No nested rule specified in <nestedUserQueue> rule");
|
||||
}
|
||||
super.initializeFromXml(el);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getQueueForApp(String requestedQueue, String user,
|
||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
|
||||
throws IOException {
|
||||
// Apply the nested rule
|
||||
String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
|
||||
groups, configuredQueues);
|
||||
|
||||
if (queueName != null && queueName != "") {
|
||||
if (!queueName.startsWith("root.")) {
|
||||
queueName = "root." + queueName;
|
||||
}
|
||||
|
||||
// Verify if the queue returned by the nested rule is an configured leaf queue,
|
||||
// if yes then skip to next rule in the queue placement policy
|
||||
if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
|
||||
return "";
|
||||
}
|
||||
return queueName + "." + user;
|
||||
}
|
||||
return queueName;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTerminal() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Places apps in queues by requested queue of the submitter
|
||||
*/
|
||||
public static class Specified extends QueuePlacementRule {
|
||||
@Override
|
||||
protected String getQueueForApp(String requestedQueue,
|
||||
String user, Groups groups, Collection<String> configuredQueues) {
|
||||
protected String getQueueForApp(String requestedQueue, String user,
|
||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
|
||||
if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
|
||||
return "";
|
||||
} else {
|
||||
|
@ -195,7 +277,7 @@ public abstract class QueuePlacementRule {
|
|||
public static class Default extends QueuePlacementRule {
|
||||
@Override
|
||||
protected String getQueueForApp(String requestedQueue, String user,
|
||||
Groups groups, Collection<String> configuredQueues) {
|
||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
|
||||
return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
|
||||
}
|
||||
|
||||
|
@ -211,13 +293,13 @@ public abstract class QueuePlacementRule {
|
|||
public static class Reject extends QueuePlacementRule {
|
||||
@Override
|
||||
public String assignAppToQueue(String requestedQueue, String user,
|
||||
Groups groups, Collection<String> configuredQueues) {
|
||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getQueueForApp(String requestedQueue, String user,
|
||||
Groups groups, Collection<String> configuredQueues) {
|
||||
Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
|
@ -28,6 +27,7 @@ import java.util.List;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
|
@ -99,9 +99,12 @@ public class TestAllocationFileLoaderService {
|
|||
assertEquals(1, rules.size());
|
||||
assertEquals(QueuePlacementRule.Default.class, rules.get(0).getClass());
|
||||
assertEquals(1, allocConf.getQueueMaxApps("root.queueA"));
|
||||
assertEquals(2, allocConf.getQueueNames().size());
|
||||
assertTrue(allocConf.getQueueNames().contains("root.queueA"));
|
||||
assertTrue(allocConf.getQueueNames().contains("root.queueB"));
|
||||
assertEquals(2, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
|
||||
.size());
|
||||
assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
|
||||
.contains("root.queueA"));
|
||||
assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
|
||||
.contains("root.queueB"));
|
||||
|
||||
confHolder.allocConf = null;
|
||||
|
||||
|
@ -114,6 +117,9 @@ public class TestAllocationFileLoaderService {
|
|||
out.println(" </queue>");
|
||||
out.println(" <queuePlacementPolicy>");
|
||||
out.println(" <rule name='specified' />");
|
||||
out.println(" <rule name='nestedUserQueue' >");
|
||||
out.println(" <rule name='primaryGroup' />");
|
||||
out.println(" </rule>");
|
||||
out.println(" <rule name='default' />");
|
||||
out.println(" </queuePlacementPolicy>");
|
||||
out.println("</allocations>");
|
||||
|
@ -131,12 +137,18 @@ public class TestAllocationFileLoaderService {
|
|||
allocConf = confHolder.allocConf;
|
||||
policy = allocConf.getPlacementPolicy();
|
||||
rules = policy.getRules();
|
||||
assertEquals(2, rules.size());
|
||||
assertEquals(3, rules.size());
|
||||
assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
|
||||
assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass());
|
||||
assertEquals(QueuePlacementRule.NestedUserQueue.class, rules.get(1)
|
||||
.getClass());
|
||||
assertEquals(QueuePlacementRule.PrimaryGroup.class,
|
||||
((NestedUserQueue) (rules.get(1))).nestedRule.getClass());
|
||||
assertEquals(QueuePlacementRule.Default.class, rules.get(2).getClass());
|
||||
assertEquals(3, allocConf.getQueueMaxApps("root.queueB"));
|
||||
assertEquals(1, allocConf.getQueueNames().size());
|
||||
assertTrue(allocConf.getQueueNames().contains("root.queueB"));
|
||||
assertEquals(1, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
|
||||
.size());
|
||||
assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
|
||||
.contains("root.queueB"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -170,6 +182,14 @@ public class TestAllocationFileLoaderService {
|
|||
out.println("<queue name=\"queueE\">");
|
||||
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
|
||||
out.println("</queue>");
|
||||
//Make queue F a parent queue without configured leaf queues using the 'type' attribute
|
||||
out.println("<queue name=\"queueF\" type=\"parent\" >");
|
||||
out.println("</queue>");
|
||||
//Create hierarchical queues G,H
|
||||
out.println("<queue name=\"queueG\">");
|
||||
out.println(" <queue name=\"queueH\">");
|
||||
out.println(" </queue>");
|
||||
out.println("</queue>");
|
||||
// Set default limit of apps per queue to 15
|
||||
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
|
||||
// Set default limit of apps per user to 5
|
||||
|
@ -194,7 +214,7 @@ public class TestAllocationFileLoaderService {
|
|||
allocLoader.reloadAllocations();
|
||||
AllocationConfiguration queueConf = confHolder.allocConf;
|
||||
|
||||
assertEquals(5, queueConf.getQueueNames().size());
|
||||
assertEquals(6, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size());
|
||||
assertEquals(Resources.createResource(0),
|
||||
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
||||
assertEquals(Resources.createResource(0),
|
||||
|
@ -250,6 +270,14 @@ public class TestAllocationFileLoaderService {
|
|||
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
|
||||
assertEquals(300000, queueConf.getFairSharePreemptionTimeout());
|
||||
|
||||
assertTrue(queueConf.getConfiguredQueues()
|
||||
.get(FSQueueType.PARENT)
|
||||
.contains("root.queueF"));
|
||||
assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.PARENT)
|
||||
.contains("root.queueG"));
|
||||
assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.LEAF)
|
||||
.contains("root.queueG.queueH"));
|
||||
|
||||
// Verify existing queues have default scheduling policy
|
||||
assertEquals(DominantResourceFairnessPolicy.NAME,
|
||||
queueConf.getSchedulingPolicy("root").getName());
|
||||
|
@ -315,7 +343,7 @@ public class TestAllocationFileLoaderService {
|
|||
allocLoader.reloadAllocations();
|
||||
AllocationConfiguration queueConf = confHolder.allocConf;
|
||||
|
||||
assertEquals(5, queueConf.getQueueNames().size());
|
||||
assertEquals(5, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size());
|
||||
assertEquals(Resources.createResource(0),
|
||||
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
|
||||
assertEquals(Resources.createResource(0),
|
||||
|
|
|
@ -34,6 +34,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
@ -43,7 +44,6 @@ import java.util.Set;
|
|||
import javax.xml.parsers.ParserConfigurationException;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
|
@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
|
@ -737,8 +738,11 @@ public class TestFairScheduler {
|
|||
rules.add(new QueuePlacementRule.Default().initialize(true, null));
|
||||
Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
|
||||
"root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
|
||||
Map<FSQueueType, Set<String>> configuredQueues = new HashMap<FSQueueType, Set<String>>();
|
||||
configuredQueues.put(FSQueueType.LEAF, queues);
|
||||
configuredQueues.put(FSQueueType.PARENT, new HashSet<String>());
|
||||
scheduler.getAllocationConfiguration().placementPolicy =
|
||||
new QueuePlacementPolicy(rules, queues, conf);
|
||||
new QueuePlacementPolicy(rules, configuredQueues, conf);
|
||||
appId = createSchedulingRequest(1024, "somequeue", "user1");
|
||||
assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
|
||||
appId = createSchedulingRequest(1024, "default", "user1");
|
||||
|
@ -758,7 +762,7 @@ public class TestFairScheduler {
|
|||
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
|
||||
rules.add(new QueuePlacementRule.Default().initialize(true, null));
|
||||
scheduler.getAllocationConfiguration().placementPolicy =
|
||||
new QueuePlacementPolicy(rules, queues, conf);
|
||||
new QueuePlacementPolicy(rules, configuredQueues, conf);
|
||||
appId = createSchedulingRequest(1024, "somequeue", "user1");
|
||||
assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
|
||||
appId = createSchedulingRequest(1024, "somequeue", "otheruser");
|
||||
|
@ -809,7 +813,89 @@ public class TestFairScheduler {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedUserQueue() throws IOException {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"user1group\" type=\"parent\">");
|
||||
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queuePlacementPolicy>");
|
||||
out.println("<rule name=\"specified\" create=\"false\" />");
|
||||
out.println("<rule name=\"nestedUserQueue\">");
|
||||
out.println(" <rule name=\"primaryGroup\" create=\"false\" />");
|
||||
out.println("</rule>");
|
||||
out.println("<rule name=\"default\" />");
|
||||
out.println("</queuePlacementPolicy>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
|
||||
|
||||
FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default",
|
||||
"user1");
|
||||
|
||||
assertEquals("root.user1group.user1", user1Leaf.getName());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"parentq\" type=\"parent\">");
|
||||
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queuePlacementPolicy>");
|
||||
out.println("<rule name=\"nestedUserQueue\">");
|
||||
out.println(" <rule name=\"specified\" create=\"false\" />");
|
||||
out.println("</rule>");
|
||||
out.println("<rule name=\"default\" />");
|
||||
out.println("</queuePlacementPolicy>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
|
||||
RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
|
||||
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
int capacity = 16 * 1024;
|
||||
// create node with 16 G
|
||||
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity),
|
||||
1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
// user1,user2 submit their apps to parentq and create user queues
|
||||
scheduler.assignToQueue(rmApp1, "root.parentq", "user1");
|
||||
scheduler.assignToQueue(rmApp2, "root.parentq", "user2");
|
||||
|
||||
scheduler.update();
|
||||
|
||||
Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
|
||||
.getLeafQueues();
|
||||
|
||||
for (FSLeafQueue leaf : leafQueues) {
|
||||
if (leaf.getName().equals("root.parentq.user1")
|
||||
|| leaf.getName().equals("root.parentq.user2")) {
|
||||
// assert that the fair share is 1/4th node1's capacity
|
||||
assertEquals(capacity / 4, leaf.getFairShare().getMemory());
|
||||
// assert weights are equal for both the user queues
|
||||
assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make allocation requests and ensure they are reflected in queue demand.
|
||||
*/
|
||||
|
|
|
@ -17,8 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
import java.util.HashSet;
|
||||
|
@ -57,45 +56,77 @@ public class TestQueueManager {
|
|||
|
||||
@Test
|
||||
public void testReloadTurnsLeafQueueIntoParent() throws Exception {
|
||||
updateConfiguredQueues(queueManager, "queue1");
|
||||
updateConfiguredLeafQueues(queueManager, "queue1");
|
||||
|
||||
// When no apps are running in the leaf queue, should be fine turning it
|
||||
// into a parent.
|
||||
updateConfiguredQueues(queueManager, "queue1.queue2");
|
||||
updateConfiguredLeafQueues(queueManager, "queue1.queue2");
|
||||
assertNull(queueManager.getLeafQueue("queue1", false));
|
||||
assertNotNull(queueManager.getLeafQueue("queue1.queue2", false));
|
||||
|
||||
// When leaf queues are empty, should be ok deleting them and
|
||||
// turning parent into a leaf.
|
||||
updateConfiguredQueues(queueManager, "queue1");
|
||||
updateConfiguredLeafQueues(queueManager, "queue1");
|
||||
assertNull(queueManager.getLeafQueue("queue1.queue2", false));
|
||||
assertNotNull(queueManager.getLeafQueue("queue1", false));
|
||||
|
||||
// When apps exist in leaf queue, we shouldn't be able to create
|
||||
// children under it, but things should work otherwise.
|
||||
notEmptyQueues.add(queueManager.getLeafQueue("queue1", false));
|
||||
updateConfiguredQueues(queueManager, "queue1.queue2");
|
||||
updateConfiguredLeafQueues(queueManager, "queue1.queue2");
|
||||
assertNull(queueManager.getLeafQueue("queue1.queue2", false));
|
||||
assertNotNull(queueManager.getLeafQueue("queue1", false));
|
||||
|
||||
// When apps exist in leaf queues under a parent queue, shouldn't be
|
||||
// able to turn it into a leaf queue, but things should work otherwise.
|
||||
notEmptyQueues.clear();
|
||||
updateConfiguredQueues(queueManager, "queue1.queue2");
|
||||
updateConfiguredLeafQueues(queueManager, "queue1.queue2");
|
||||
notEmptyQueues.add(queueManager.getQueue("root.queue1"));
|
||||
updateConfiguredQueues(queueManager, "queue1");
|
||||
updateConfiguredLeafQueues(queueManager, "queue1");
|
||||
assertNotNull(queueManager.getLeafQueue("queue1.queue2", false));
|
||||
assertNull(queueManager.getLeafQueue("queue1", false));
|
||||
|
||||
// Should never to be able to create a queue under the default queue
|
||||
updateConfiguredQueues(queueManager, "default.queue3");
|
||||
updateConfiguredLeafQueues(queueManager, "default.queue3");
|
||||
assertNull(queueManager.getLeafQueue("default.queue3", false));
|
||||
assertNotNull(queueManager.getLeafQueue("default", false));
|
||||
}
|
||||
|
||||
private void updateConfiguredQueues(QueueManager queueMgr, String... confQueues) {
|
||||
@Test
|
||||
public void testReloadTurnsLeafToParentWithNoLeaf() {
|
||||
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
|
||||
allocConf.queueNames = Sets.newHashSet(confQueues);
|
||||
// Create a leaf queue1
|
||||
allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.queue1");
|
||||
queueManager.updateAllocationConfiguration(allocConf);
|
||||
assertNotNull(queueManager.getLeafQueue("root.queue1", false));
|
||||
|
||||
// Lets say later on admin makes queue1 a parent queue by
|
||||
// specifying "type=parent" in the alloc xml and lets say apps running in
|
||||
// queue1
|
||||
notEmptyQueues.add(queueManager.getLeafQueue("root.queue1", false));
|
||||
allocConf = new AllocationConfiguration(conf);
|
||||
allocConf.configuredQueues.get(FSQueueType.PARENT)
|
||||
.add("root.queue1");
|
||||
|
||||
// When allocs are reloaded queue1 shouldn't be converter to parent
|
||||
queueManager.updateAllocationConfiguration(allocConf);
|
||||
assertNotNull(queueManager.getLeafQueue("root.queue1", false));
|
||||
assertNull(queueManager.getParentQueue("root.queue1", false));
|
||||
|
||||
// Now lets assume apps completed and there are no apps in queue1
|
||||
notEmptyQueues.clear();
|
||||
// We should see queue1 transform from leaf queue to parent queue.
|
||||
queueManager.updateAllocationConfiguration(allocConf);
|
||||
assertNull(queueManager.getLeafQueue("root.queue1", false));
|
||||
assertNotNull(queueManager.getParentQueue("root.queue1", false));
|
||||
// this parent should not have any children
|
||||
assertTrue(queueManager.getParentQueue("root.queue1", false)
|
||||
.getChildQueues().isEmpty());
|
||||
}
|
||||
|
||||
private void updateConfiguredLeafQueues(QueueManager queueMgr, String... confLeafQueues) {
|
||||
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
|
||||
allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues));
|
||||
queueMgr.updateAllocationConfiguration(allocConf);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,8 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
|
@ -28,16 +31,15 @@ import org.apache.commons.io.IOUtils;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
|
||||
public class TestQueuePlacementPolicy {
|
||||
private final static Configuration conf = new Configuration();
|
||||
private final static Set<String> configuredQueues = Sets.newHashSet("root.someuser");
|
||||
private Map<FSQueueType, Set<String>> configuredQueues;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() {
|
||||
|
@ -45,6 +47,14 @@ public class TestQueuePlacementPolicy {
|
|||
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void initTest() {
|
||||
configuredQueues = new HashMap<FSQueueType, Set<String>>();
|
||||
for (FSQueueType type : FSQueueType.values()) {
|
||||
configuredQueues.put(type, new HashSet<String>());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSpecifiedUserPolicy() throws Exception {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
|
@ -53,9 +63,12 @@ public class TestQueuePlacementPolicy {
|
|||
sb.append(" <rule name='user' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
QueuePlacementPolicy policy = parse(sb.toString());
|
||||
assertEquals("root.specifiedq",policy.assignAppToQueue("specifiedq", "someuser"));
|
||||
assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
|
||||
assertEquals("root.otheruser", policy.assignAppToQueue("default", "otheruser"));
|
||||
assertEquals("root.specifiedq",
|
||||
policy.assignAppToQueue("specifiedq", "someuser"));
|
||||
assertEquals("root.someuser",
|
||||
policy.assignAppToQueue("default", "someuser"));
|
||||
assertEquals("root.otheruser",
|
||||
policy.assignAppToQueue("default", "otheruser"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -66,6 +79,8 @@ public class TestQueuePlacementPolicy {
|
|||
sb.append(" <rule name='user' create=\"false\" />");
|
||||
sb.append(" <rule name='default' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
|
||||
configuredQueues.get(FSQueueType.LEAF).add("root.someuser");
|
||||
QueuePlacementPolicy policy = parse(sb.toString());
|
||||
assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
|
||||
assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
|
||||
|
@ -81,7 +96,8 @@ public class TestQueuePlacementPolicy {
|
|||
sb.append(" <rule name='reject' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
QueuePlacementPolicy policy = parse(sb.toString());
|
||||
assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
|
||||
assertEquals("root.specifiedq",
|
||||
policy.assignAppToQueue("specifiedq", "someuser"));
|
||||
assertEquals(null, policy.assignAppToQueue("default", "someuser"));
|
||||
}
|
||||
|
||||
|
@ -117,10 +133,188 @@ public class TestQueuePlacementPolicy {
|
|||
parse(sb.toString());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedUserQueueParsingErrors() {
|
||||
// No nested rule specified in hierarchical user queue
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("<queuePlacementPolicy>");
|
||||
sb.append(" <rule name='specified' />");
|
||||
sb.append(" <rule name='nestedUserQueue'/>");
|
||||
sb.append(" <rule name='default' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
|
||||
assertIfExceptionThrown(sb);
|
||||
|
||||
// Specified nested rule is not a QueuePlacementRule
|
||||
sb = new StringBuffer();
|
||||
sb.append("<queuePlacementPolicy>");
|
||||
sb.append(" <rule name='specified' />");
|
||||
sb.append(" <rule name='nestedUserQueue'>");
|
||||
sb.append(" <rule name='unknownRule'/>");
|
||||
sb.append(" </rule>");
|
||||
sb.append(" <rule name='default' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
|
||||
assertIfExceptionThrown(sb);
|
||||
}
|
||||
|
||||
private void assertIfExceptionThrown(StringBuffer sb) {
|
||||
Throwable th = null;
|
||||
try {
|
||||
parse(sb.toString());
|
||||
} catch (Exception e) {
|
||||
th = e;
|
||||
}
|
||||
|
||||
assertTrue(th instanceof AllocationConfigurationException);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedUserQueueParsing() throws Exception {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("<queuePlacementPolicy>");
|
||||
sb.append(" <rule name='specified' />");
|
||||
sb.append(" <rule name='nestedUserQueue'>");
|
||||
sb.append(" <rule name='primaryGroup'/>");
|
||||
sb.append(" </rule>");
|
||||
sb.append(" <rule name='default' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
|
||||
Throwable th = null;
|
||||
try {
|
||||
parse(sb.toString());
|
||||
} catch (Exception e) {
|
||||
th = e;
|
||||
}
|
||||
|
||||
assertNull(th);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedUserQueuePrimaryGroup() throws Exception {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("<queuePlacementPolicy>");
|
||||
sb.append(" <rule name='specified' create='false' />");
|
||||
sb.append(" <rule name='nestedUserQueue'>");
|
||||
sb.append(" <rule name='primaryGroup'/>");
|
||||
sb.append(" </rule>");
|
||||
sb.append(" <rule name='default' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
|
||||
// User queue would be created under primary group queue
|
||||
QueuePlacementPolicy policy = parse(sb.toString());
|
||||
assertEquals("root.user1group.user1",
|
||||
policy.assignAppToQueue("root.default", "user1"));
|
||||
// Other rules above and below hierarchical user queue rule should work as
|
||||
// usual
|
||||
configuredQueues.get(FSQueueType.LEAF).add("root.specifiedq");
|
||||
// test if specified rule(above nestedUserQueue rule) works ok
|
||||
assertEquals("root.specifiedq",
|
||||
policy.assignAppToQueue("root.specifiedq", "user2"));
|
||||
|
||||
// test if default rule(below nestedUserQueue rule) works
|
||||
configuredQueues.get(FSQueueType.LEAF).add("root.user3group");
|
||||
assertEquals("root.default",
|
||||
policy.assignAppToQueue("root.default", "user3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedUserQueuePrimaryGroupNoCreate() throws Exception {
|
||||
// Primary group rule has create='false'
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("<queuePlacementPolicy>");
|
||||
sb.append(" <rule name='nestedUserQueue'>");
|
||||
sb.append(" <rule name='primaryGroup' create='false'/>");
|
||||
sb.append(" </rule>");
|
||||
sb.append(" <rule name='default' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
|
||||
QueuePlacementPolicy policy = parse(sb.toString());
|
||||
|
||||
// Should return root.default since primary group 'root.user1group' is not
|
||||
// configured
|
||||
assertEquals("root.default",
|
||||
policy.assignAppToQueue("root.default", "user1"));
|
||||
|
||||
// Let's configure primary group and check if user queue is created
|
||||
configuredQueues.get(FSQueueType.PARENT).add("root.user1group");
|
||||
policy = parse(sb.toString());
|
||||
assertEquals("root.user1group.user1",
|
||||
policy.assignAppToQueue("root.default", "user1"));
|
||||
|
||||
// Both Primary group and nestedUserQueue rule has create='false'
|
||||
sb = new StringBuffer();
|
||||
sb.append("<queuePlacementPolicy>");
|
||||
sb.append(" <rule name='nestedUserQueue' create='false'>");
|
||||
sb.append(" <rule name='primaryGroup' create='false'/>");
|
||||
sb.append(" </rule>");
|
||||
sb.append(" <rule name='default' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
|
||||
// Should return root.default since primary group and user queue for user 2
|
||||
// are not configured.
|
||||
assertEquals("root.default",
|
||||
policy.assignAppToQueue("root.default", "user2"));
|
||||
|
||||
// Now configure both primary group and the user queue for user2
|
||||
configuredQueues.get(FSQueueType.PARENT).add("root.user2group");
|
||||
configuredQueues.get(FSQueueType.LEAF).add("root.user2group.user2");
|
||||
policy = parse(sb.toString());
|
||||
|
||||
assertEquals("root.user2group.user2",
|
||||
policy.assignAppToQueue("root.default", "user2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedUserQueueSecondaryGroup() throws Exception {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("<queuePlacementPolicy>");
|
||||
sb.append(" <rule name='nestedUserQueue'>");
|
||||
sb.append(" <rule name='secondaryGroupExistingQueue'/>");
|
||||
sb.append(" </rule>");
|
||||
sb.append(" <rule name='default' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
|
||||
QueuePlacementPolicy policy = parse(sb.toString());
|
||||
// Should return root.default since secondary groups are not configured
|
||||
assertEquals("root.default",
|
||||
policy.assignAppToQueue("root.default", "user1"));
|
||||
|
||||
// configure secondary group for user1
|
||||
configuredQueues.get(FSQueueType.PARENT).add("root.user1subgroup1");
|
||||
policy = parse(sb.toString());
|
||||
// user queue created should be created under secondary group
|
||||
assertEquals("root.user1subgroup1.user1",
|
||||
policy.assignAppToQueue("root.default", "user1"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNestedUserQueueSpecificRule() throws Exception {
|
||||
// This test covers the use case where users can specify different parent
|
||||
// queues and want user queues under those.
|
||||
StringBuffer sb = new StringBuffer();
|
||||
sb.append("<queuePlacementPolicy>");
|
||||
sb.append(" <rule name='nestedUserQueue'>");
|
||||
sb.append(" <rule name='specified' create='false'/>");
|
||||
sb.append(" </rule>");
|
||||
sb.append(" <rule name='default' />");
|
||||
sb.append("</queuePlacementPolicy>");
|
||||
|
||||
// Let's create couple of parent queues
|
||||
configuredQueues.get(FSQueueType.PARENT).add("root.parent1");
|
||||
configuredQueues.get(FSQueueType.PARENT).add("root.parent2");
|
||||
|
||||
QueuePlacementPolicy policy = parse(sb.toString());
|
||||
assertEquals("root.parent1.user1",
|
||||
policy.assignAppToQueue("root.parent1", "user1"));
|
||||
assertEquals("root.parent2.user2",
|
||||
policy.assignAppToQueue("root.parent2", "user2"));
|
||||
}
|
||||
|
||||
private QueuePlacementPolicy parse(String str) throws Exception {
|
||||
// Read and parse the allocations file.
|
||||
DocumentBuilderFactory docBuilderFactory =
|
||||
DocumentBuilderFactory.newInstance();
|
||||
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
|
||||
.newInstance();
|
||||
docBuilderFactory.setIgnoringComments(true);
|
||||
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
|
||||
Document doc = builder.parse(IOUtils.toInputStream(str));
|
||||
|
|
|
@ -206,8 +206,10 @@ Allocation file format
|
|||
The allocation file must be in XML format. The format contains five types of
|
||||
elements:
|
||||
|
||||
* <<Queue elements>>, which represent queues. Each may contain the following
|
||||
properties:
|
||||
* <<Queue elements>>, which represent queues. Queue elements can take an optional
|
||||
attribute ’type’,which when set to ‘parent’ makes it a parent queue. This is useful
|
||||
when we want to create a parent queue without configuring any leaf queues.
|
||||
Each queue element may contain the following properties:
|
||||
|
||||
* minResources: minimum resources the queue is entitled to, in the form
|
||||
"X mb, Y vcores". For the single-resource fairness policy, the vcores
|
||||
|
@ -299,6 +301,15 @@ Allocation file format
|
|||
that matches a secondary group of the user who submitted it. The first
|
||||
secondary group that matches a configured queue will be selected.
|
||||
|
||||
* nestedUserQueue : the app is placed into a queue with the name of the user
|
||||
under the queue suggested by the nested rule. This is similar to ‘user’
|
||||
rule,the difference being in ‘nestedUserQueue’ rule,user queues can be created
|
||||
under any parent queue, while ‘user’ rule creates user queues only under root queue.
|
||||
Note that nestedUserQueue rule would be applied only if the nested rule returns a
|
||||
parent queue.One can configure a parent queue either by setting ‘type’ attribute of queue
|
||||
to ‘parent’ or by configuring at least one leaf under that queue which makes it a parent.
|
||||
See example allocation for a sample use case.
|
||||
|
||||
* default: the app is placed into the queue named "default".
|
||||
|
||||
* reject: the app is rejected.
|
||||
|
@ -319,6 +330,12 @@ Allocation file format
|
|||
<minResources>5000 mb,0vcores</minResources>
|
||||
</queue>
|
||||
</queue>
|
||||
|
||||
<!—- Queue ‘secondary_group_queue’ is a parent queue and may have
|
||||
user queues under it -—>
|
||||
<queue name=“secondary_group_queue” type=“parent”>
|
||||
<weight>3.0</weight>
|
||||
</queue>
|
||||
|
||||
<user name="sample_user">
|
||||
<maxRunningApps>30</maxRunningApps>
|
||||
|
@ -328,6 +345,9 @@ Allocation file format
|
|||
<queuePlacementPolicy>
|
||||
<rule name="specified" />
|
||||
<rule name="primaryGroup" create="false" />
|
||||
<rule name=“nestedUserQueue”>
|
||||
<rule name=“secondaryGroupExistingQueue” create=“false” />
|
||||
</rule>
|
||||
<rule name="default" />
|
||||
</queuePlacementPolicy>
|
||||
</allocations>
|
||||
|
|
Loading…
Reference in New Issue