YARN-1392. Allow sophisticated app-to-queue placement policies in the Fair Scheduler (Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1542105 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-11-14 22:12:13 +00:00
parent ea83f79373
commit 3858b9018e
5 changed files with 187 additions and 38 deletions

View File

@ -40,6 +40,9 @@ Release 2.3.0 - UNRELEASED
YARN-311. RM/scheduler support for dynamic resource configuration. YARN-311. RM/scheduler support for dynamic resource configuration.
(Junping Du via llu) (Junping Du via llu)
YARN-1392. Allow sophisticated app-to-queue placement policies in the Fair
Scheduler (Sandy Ryza)
IMPROVEMENTS IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu) YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)

View File

@ -136,9 +136,6 @@ public class FairScheduler implements ResourceScheduler {
// How often fair shares are re-calculated (ms) // How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500; protected long UPDATE_INTERVAL = 500;
// Whether to use username in place of "default" queue name
private volatile boolean userAsDefaultQueue = false;
private final static List<Container> EMPTY_CONTAINER_LIST = private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>(); new ArrayList<Container>();
@ -640,6 +637,12 @@ public class FairScheduler implements ResourceScheduler {
RMApp rmApp = rmContext.getRMApps().get( RMApp rmApp = rmContext.getRMApps().get(
applicationAttemptId.getApplicationId()); applicationAttemptId.getApplicationId());
FSLeafQueue queue = assignToQueue(rmApp, queueName, user); FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
if (queue == null) {
rmContext.getDispatcher().getEventHandler().handle(
new RMAppAttemptRejectedEvent(applicationAttemptId,
"Application rejected by queue placement policy"));
return;
}
FSSchedulerApp schedulerApp = FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user, new FSSchedulerApp(applicationAttemptId, user,
@ -675,17 +678,16 @@ public class FairScheduler implements ResourceScheduler {
@VisibleForTesting @VisibleForTesting
FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
// Potentially set queue to username if configured to do so FSLeafQueue queue = null;
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) && try {
userAsDefaultQueue) { QueuePlacementPolicy policy = queueMgr.getPlacementPolicy();
queueName = user; queueName = policy.assignAppToQueue(queueName, user);
if (queueName == null) {
return null;
} }
queue = queueMgr.getLeafQueue(queueName, true);
FSLeafQueue queue = queueMgr.getLeafQueue(queueName, } catch (IOException ex) {
conf.getAllowUndeclaredPools()); LOG.error("Error assigning app to queue, rejecting", ex);
if (queue == null) {
// queue is not an existing or createable leaf queue
queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, false);
} }
if (rmApp != null) { if (rmApp != null) {
@ -1155,7 +1157,6 @@ public class FairScheduler implements ResourceScheduler {
minimumAllocation = this.conf.getMinimumAllocation(); minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation(); maximumAllocation = this.conf.getMaximumAllocation();
incrAllocation = this.conf.getIncrementAllocation(); incrAllocation = this.conf.getIncrementAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled(); continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs = continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs(); this.conf.getContinuousSchedulingSleepMs();

View File

@ -25,6 +25,7 @@ import java.net.URLConnection;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
@ -51,6 +52,8 @@ import org.w3c.dom.NodeList;
import org.w3c.dom.Text; import org.w3c.dom.Text;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
import com.google.common.annotations.VisibleForTesting;
/** /**
* Maintains a list of queues as well as scheduling parameters for each queue, * Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file. * such as guaranteed share allocations, from the fair scheduler config file.
@ -87,6 +90,8 @@ public class QueueManager {
private FSParentQueue rootQueue; private FSParentQueue rootQueue;
private volatile QueueManagerInfo info = new QueueManagerInfo(); private volatile QueueManagerInfo info = new QueueManagerInfo();
@VisibleForTesting
volatile QueuePlacementPolicy placementPolicy;
private long lastReloadAttempt; // Last time we tried to reload the queues file private long lastReloadAttempt; // Last time we tried to reload the queues file
private long lastSuccessfulReload; // Last time we successfully reloaded queues private long lastSuccessfulReload; // Last time we successfully reloaded queues
@ -107,6 +112,8 @@ public class QueueManager {
queues.put(rootQueue.getName(), rootQueue); queues.put(rootQueue.getName(), rootQueue);
this.allocFile = conf.getAllocationFile(); this.allocFile = conf.getAllocationFile();
placementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
new HashSet<String>(), conf);
reloadAllocs(); reloadAllocs();
lastSuccessfulReload = scheduler.getClock().getTime(); lastSuccessfulReload = scheduler.getClock().getTime();
@ -115,6 +122,28 @@ public class QueueManager {
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true); getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
} }
public void updatePlacementPolicy(FairSchedulerConfiguration conf) {
}
/**
* Construct simple queue placement policy from allow-undeclared-pools and
* user-as-default-queue.
*/
private List<QueuePlacementRule> getSimplePlacementRules() {
boolean create = scheduler.getConf().getAllowUndeclaredPools();
boolean userAsDefaultQueue = scheduler.getConf().getUserAsDefaultQueue();
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
rules.add(new QueuePlacementRule.Specified().initialize(create, null));
if (userAsDefaultQueue) {
rules.add(new QueuePlacementRule.User().initialize(create, null));
}
if (!userAsDefaultQueue || !create) {
rules.add(new QueuePlacementRule.Default().initialize(true, null));
}
return rules;
}
/** /**
* Get a queue by name, creating it if the create param is true and is necessary. * Get a queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a leaf queue, i.e. it already exists as a * If the queue is not or can not be a leaf queue, i.e. it already exists as a
@ -227,6 +256,10 @@ public class QueueManager {
} }
} }
public QueuePlacementPolicy getPlacementPolicy() {
return placementPolicy;
}
/** /**
* Reload allocations file if it hasn't been loaded in a while * Reload allocations file if it hasn't been loaded in a while
*/ */
@ -291,6 +324,8 @@ public class QueueManager {
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault(); SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc. // Remember all queue names so we can display them on web UI, etc.
List<String> queueNamesInAllocFile = new ArrayList<String>(); List<String> queueNamesInAllocFile = new ArrayList<String>();
@ -306,6 +341,7 @@ public class QueueManager {
"file: top-level element not <allocations>"); "file: top-level element not <allocations>");
NodeList elements = root.getChildNodes(); NodeList elements = root.getChildNodes();
List<Element> queueElements = new ArrayList<Element>(); List<Element> queueElements = new ArrayList<Element>();
Element placementPolicyElement = null;
for (int i = 0; i < elements.getLength(); i++) { for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i); Node node = elements.item(i);
if (node instanceof Element) { if (node instanceof Element) {
@ -348,6 +384,8 @@ public class QueueManager {
String text = ((Text)element.getFirstChild()).getData().trim(); String text = ((Text)element.getFirstChild()).getData().trim();
SchedulingPolicy.setDefault(text); SchedulingPolicy.setDefault(text);
defaultSchedPolicy = SchedulingPolicy.getDefault(); defaultSchedPolicy = SchedulingPolicy.getDefault();
} else if ("queuePlacementPolicy".equals(element.getTagName())) {
placementPolicyElement = element;
} else { } else {
LOG.warn("Bad element in allocations file: " + element.getTagName()); LOG.warn("Bad element in allocations file: " + element.getTagName());
} }
@ -370,6 +408,15 @@ public class QueueManager {
queueAcls, queueNamesInAllocFile); queueAcls, queueNamesInAllocFile);
} }
// Load placement policy and pass it configured queues
if (placementPolicyElement != null) {
newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
} else {
newPlacementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
}
// Commit the reload; also create any queue defined in the alloc file // Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI. // if it does not already exist, so it can be displayed on the web UI.
synchronized (this) { synchronized (this) {
@ -377,6 +424,7 @@ public class QueueManager {
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts, queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout); queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
placementPolicy = newPlacementPolicy;
// Make sure all queues exist // Make sure all queues exist
for (String name: queueNamesInAllocFile) { for (String name: queueNamesInAllocFile) {

View File

@ -44,7 +44,9 @@ import javax.xml.parsers.ParserConfigurationException;
import junit.framework.Assert; import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.MockApps; import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -94,6 +96,8 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.xml.sax.SAXException; import org.xml.sax.SAXException;
import com.google.common.collect.Sets;
public class TestFairScheduler { public class TestFairScheduler {
private class MockClock implements Clock { private class MockClock implements Clock {
@ -616,6 +620,7 @@ public class TestFairScheduler {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.getQueueManager().initialize();
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent( AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
createAppAttemptId(2, 1), "default", "user2"); createAppAttemptId(2, 1), "default", "user2");
scheduler.handle(appAddedEvent2); scheduler.handle(appAddedEvent2);
@ -665,6 +670,46 @@ public class TestFairScheduler {
assertEquals("root.notdefault", rmApp2.getQueue()); assertEquals("root.notdefault", rmApp2.getQueue());
} }
@Test
public void testQueuePlacementWithPolicy() throws Exception {
Configuration conf = createConfiguration();
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
ApplicationAttemptId appId;
Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications;
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
rules.add(new QueuePlacementRule.User().initialize(false, null));
rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
rules.add(new QueuePlacementRule.Default().initialize(true, null));
Set<String> queues = Sets.newHashSet("root.user1", "root.user3group");
scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
rules, queues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.somequeue", apps.get(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user1");
assertEquals("root.user1", apps.get(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user3");
assertEquals("root.user3group", apps.get(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "otheruser");
assertEquals("root.default", apps.get(appId).getQueueName());
// test without specified as first rule
rules = new ArrayList<QueuePlacementRule>();
rules.add(new QueuePlacementRule.User().initialize(false, null));
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
rules.add(new QueuePlacementRule.Default().initialize(true, null));
scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
rules, queues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.user1", apps.get(appId).getQueueName());
appId = createSchedulingRequest(1024, "somequeue", "otheruser");
assertEquals("root.somequeue", apps.get(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "otheruser");
assertEquals("root.default", apps.get(appId).getQueueName());
}
@Test @Test
public void testFairShareWithMinAlloc() throws Exception { public void testFairShareWithMinAlloc() throws Exception {
Configuration conf = createConfiguration(); Configuration conf = createConfiguration();

View File

@ -101,6 +101,16 @@ Hadoop MapReduce Next Generation - Fair Scheduler
Fair Scheduler. Among them, is the use of a custom policies governing Fair Scheduler. Among them, is the use of a custom policies governing
priority “boosting” over certain apps. priority “boosting” over certain apps.
* {Automatically placing applications in queues}
The Fair Scheduler allows administrators to configure policies that
automatically place submitted applications into appropriate queues. Placement
can depend on the user and groups of the submitter and the requested queue
passed by the application. A policy consists of a set of rules that are applied
sequentially to classify an incoming application. Each rule either places the
app into a queue, rejects it, or continues on to the next rule. Refer to the
allocation file format below for how to configure these policies.
* {Installation} * {Installation}
To use the Fair Scheduler first assign the appropriate scheduler class in To use the Fair Scheduler first assign the appropriate scheduler class in
@ -138,7 +148,8 @@ Properties that can be placed in yarn-site.xml
* Whether to use the username associated with the allocation as the default * Whether to use the username associated with the allocation as the default
queue name, in the event that a queue name is not specified. If this is set queue name, in the event that a queue name is not specified. If this is set
to "false" or unset, all jobs have a shared default queue, named "default". to "false" or unset, all jobs have a shared default queue, named "default".
Defaults to true. Defaults to true. If a queue placement policy is given in the allocations
file, this property is ignored.
* <<<yarn.scheduler.fair.preemption>>> * <<<yarn.scheduler.fair.preemption>>>
@ -180,6 +191,16 @@ Properties that can be placed in yarn-site.xml
opportunities to pass up. The default value of -1.0 means don't pass up any opportunities to pass up. The default value of -1.0 means don't pass up any
scheduling opportunities. scheduling opportunities.
* <<<yarn.scheduler.fair.allow-undeclared-pools>>>
* If this is true, new queues can be created at application submission time,
whether because they are specified as the application's queue by the
submitter or because they are placed there by the user-as-default-queue
property. If this is false, any time an app would be placed in a queue that
is not specified in the allocations file, it is placed in the "default" queue
instead. Defaults to true. If a queue placement policy is given in the
allocations file, this property is ignored.
Allocation file format Allocation file format
The allocation file must be in XML format. The format contains five types of The allocation file must be in XML format. The format contains five types of
@ -248,8 +269,61 @@ Allocation file format
policy for queues; overriden by the schedulingPolicy element in each queue policy for queues; overriden by the schedulingPolicy element in each queue
if specified. Defaults to "fair". if specified. Defaults to "fair".
* <<A queuePlacementPolicy element>>, which contains a list of rule elements
that tell the scheduler how to place incoming apps into queues. Rules
are applied in the order that they are listed. Rules may take arguments. All
rules accept the "create" argument, which indicates whether the rule can create
a new queue. "Create" defaults to true; if set to false and the rule would
place the app in a queue that is not configured in the allocations file, we
continue on to the next rule. The last rule must be one that can never issue a
continue. Valid rules are:
* specified: the app is placed into the queue it requested. If the app
requested no queue, i.e. it specified "default", we continue.
* user: the app is placed into a queue with the name of the user who
submitted it.
* primaryGroup: the app is placed into a queue with the name of the
primary group of the user who submitted it.
* default: the app is placed into the queue named "default".
* reject: the app is rejected.
An example allocation file is given here: An example allocation file is given here:
---
<?xml version="1.0"?>
<allocations>
<queue name="sample_queue">
<minResources>10000 mb,0vcores</minResources>
<maxResources>90000 mb,0vcores</maxResources>
<maxRunningApps>50</maxRunningApps>
<weight>2.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<queue name="sample_sub_queue">
<aclSubmitApps>charlie</aclSubmitApps>
<minResources>5000 mb,0vcores</minResources>
</queue>
</queue>
<user name="sample_user">
<maxRunningApps>30</maxRunningApps>
</user>
<userMaxAppsDefault>5</userMaxAppsDefault>
<queuePlacementPolicy>
<specified />
<primarygroup create="false" />
<default />
</queuePlacementPolicy>
</allocations>
---
Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.
Queue Access Control Lists (ACLs) Queue Access Control Lists (ACLs)
Queue Access Control Lists (ACLs) allow administrators to control who may Queue Access Control Lists (ACLs) allow administrators to control who may
@ -268,28 +342,6 @@ Queue Access Control Lists (ACLs)
To start restricting access, change the root queue's ACLs to something other To start restricting access, change the root queue's ACLs to something other
than "*". than "*".
---
<?xml version="1.0"?>
<allocations>
<queue name="sample_queue">
<minResources>10000 mb,0vcores</minResources>
<maxResources>90000 mb,0vcores</maxResources>
<maxRunningApps>50</maxRunningApps>
<weight>2.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<queue name="sample_sub_queue">
<aclSubmitApps>charlie</aclSubmitApps>
<minResources>5000 mb,0vcores</minResources>
</queue>
</queue>
<user name="sample_user">
<maxRunningApps>30</maxRunningApps>
</user>
<userMaxAppsDefault>5</userMaxAppsDefault>
</allocations>
---
Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.
* {Administration} * {Administration}