Merge -r 1415591:1415592 from trunk to branch-2. Fixes: YARN-187. Add hierarchical queues to the fair scheduler. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1415593 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2012-11-30 12:05:13 +00:00
parent 208cfa37a9
commit 232cc5d59c
15 changed files with 974 additions and 581 deletions

View File

@ -100,6 +100,8 @@ Release 2.0.3-alpha - Unreleased
YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy)
YARN-187. Add hierarchical queues to the fair scheduler. (Sandy Ryza via tomwhite)
Release 2.0.2-alpha - 2012-09-07
YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy)

View File

@ -50,10 +50,10 @@ public class AppSchedulable extends Schedulable {
private long startTime;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
private FSQueue queue;
private FSLeafQueue queue;
private RMContainerTokenSecretManager containerTokenSecretManager;
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSQueue queue) {
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler;
this.app = app;
this.startTime = System.currentTimeMillis();
@ -96,9 +96,6 @@ public long getStartTime() {
return startTime;
}
@Override
public void redistributeShare() {}
@Override
public Resource getResourceUsage() {
return app.getCurrentConsumption();
@ -114,7 +111,7 @@ public Resource getMinShare() {
* Get metrics reference from containing queue.
*/
public QueueMetrics getMetrics() {
return queue.getQueueSchedulable().getMetrics();
return queue.getMetrics();
}
@Override

View File

@ -22,73 +22,57 @@
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@Private
@Unstable
public class FSQueueSchedulable extends Schedulable implements Queue {
public static final Log LOG = LogFactory.getLog(
FSQueueSchedulable.class.getName());
public class FSLeafQueue extends FSQueue {
private static final Log LOG = LogFactory.getLog(
FSLeafQueue.class.getName());
private final List<AppSchedulable> appScheds =
new ArrayList<AppSchedulable>();
private FairScheduler scheduler;
private FSQueue queue;
private QueueManager queueMgr;
private List<AppSchedulable> appScheds = new LinkedList<AppSchedulable>();
/** Scheduling mode for jobs inside the queue (fair or FIFO) */
private SchedulingMode schedulingMode;
private final FairScheduler scheduler;
private final QueueManager queueMgr;
private Resource demand = Resources.createResource(0);
private QueueMetrics metrics;
private RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
// Variables used for preemption
long lastTimeAtMinShare;
long lastTimeAtHalfFairShare;
// Constructor for tests
protected FSQueueSchedulable(FairScheduler scheduler, FSQueue fsQueue,
QueueManager qMgr, QueueMetrics metrics, long minShare, long fairShare) {
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
public FSLeafQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
FSParentQueue parent) {
super(name, queueMgr, scheduler, parent);
this.scheduler = scheduler;
this.queueMgr = qMgr;
this.queue = fsQueue;
this.metrics = metrics;
this.lastTimeAtMinShare = minShare;
this.lastTimeAtHalfFairShare = fairShare;
}
public FSQueueSchedulable(FairScheduler scheduler, FSQueue queue) {
this.scheduler = scheduler;
this.queue = queue;
this.queueMgr = scheduler.getQueueManager();
this.metrics = QueueMetrics.forQueue(getName(), null, true, scheduler.getConf());
this.queueMgr = queueMgr;
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
}
public void addApp(AppSchedulable app) {
appScheds.add(app);
public void addApp(FSSchedulerApp app) {
AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
app.setAppSchedulable(appSchedulable);
appScheds.add(appSchedulable);
}
// for testing
void addAppSchedulable(AppSchedulable appSched) {
appScheds.add(appSched);
}
public void removeApp(FSSchedulerApp app) {
for (Iterator<AppSchedulable> it = appScheds.iterator(); it.hasNext();) {
AppSchedulable appSched = it.next();
@ -98,17 +82,47 @@ public void removeApp(FSSchedulerApp app) {
}
}
}
public Collection<AppSchedulable> getAppSchedulables() {
return appScheds;
}
public void setSchedulingMode(SchedulingMode mode) {
this.schedulingMode = mode;
}
@Override
public void recomputeFairShares() {
if (schedulingMode == SchedulingMode.FAIR) {
SchedulingAlgorithms.computeFairShares(appScheds, getFairShare());
} else {
for (AppSchedulable sched: appScheds) {
sched.setFairShare(Resources.createResource(0));
}
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
for (AppSchedulable app : appScheds) {
Resources.addTo(usage, app.getResourceUsage());
}
return usage;
}
/**
* Update demand by asking apps in the queue to update
*/
@Override
public void updateDemand() {
// Compute demand by iterating through apps in the queue
// Limit demand to maxResources
Resource maxRes = queueMgr.getMaxResources(queue.getName());
Resource maxRes = queueMgr.getMaxResources(getName());
demand = Resources.createResource(0);
for (AppSchedulable sched: appScheds) {
for (AppSchedulable sched : appScheds) {
sched.updateDemand();
Resource toAdd = sched.getDemand();
if (LOG.isDebugEnabled()) {
@ -128,46 +142,12 @@ public void updateDemand() {
}
}
/**
* Distribute the queue's fair share among its jobs
*/
@Override
public void redistributeShare() {
if (queue.getSchedulingMode() == SchedulingMode.FAIR) {
SchedulingAlgorithms.computeFairShares(appScheds, getFairShare());
} else {
for (AppSchedulable sched: appScheds) {
sched.setFairShare(Resources.createResource(0));
}
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public Resource getMinShare() {
return queueMgr.getMinResources(queue.getName());
}
@Override
public double getWeight() {
return queueMgr.getQueueWeight(queue.getName());
}
@Override
public long getStartTime() {
return 0;
}
@Override
public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved);
// If this queue is over its limit, reject
if (Resources.greaterThan(getResourceUsage(),
queueMgr.getMaxResources(queue.getName()))) {
queueMgr.getMaxResources(getName()))) {
return Resources.none();
}
@ -185,15 +165,14 @@ public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
// Otherwise, chose app to schedule based on given policy (fair vs fifo).
else {
SchedulingMode mode = queue.getSchedulingMode();
Comparator<Schedulable> comparator;
if (mode == SchedulingMode.FIFO) {
if (schedulingMode == SchedulingMode.FIFO) {
comparator = new SchedulingAlgorithms.FifoComparator();
} else if (mode == SchedulingMode.FAIR) {
} else if (schedulingMode == SchedulingMode.FAIR) {
comparator = new SchedulingAlgorithms.FairShareComparator();
} else {
throw new RuntimeException("Unsupported queue scheduling mode " + mode);
throw new RuntimeException("Unsupported queue scheduling mode " +
schedulingMode);
}
Collections.sort(appScheds, comparator);
@ -203,81 +182,13 @@ public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
return Resources.none();
}
}
@Override
public String getName() {
return queue.getName();
public Collection<FSQueue> getChildQueues() {
return new ArrayList<FSQueue>(1);
}
FSQueue getQueue() {
return queue;
}
public Collection<AppSchedulable> getAppSchedulables() {
return appScheds;
}
public long getLastTimeAtMinShare() {
return lastTimeAtMinShare;
}
public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
this.lastTimeAtMinShare = lastTimeAtMinShare;
}
public long getLastTimeAtHalfFairShare() {
return lastTimeAtHalfFairShare;
}
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
}
@Override
public QueueMetrics getMetrics() {
return metrics;
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
for (AppSchedulable app : appScheds) {
Resources.addTo(usage, app.getResourceUsage());
}
return usage;
}
@Override
public Priority getPriority() {
Priority p = recordFactory.newRecordInstance(Priority.class);
p.setPriority(1);
return p;
}
@Override
public Map<QueueACL, AccessControlList> getQueueAcls() {
Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
return new HashMap<QueueACL, AccessControlList>(acls);
}
@Override
public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(getQueueName());
// TODO: we might change these queue metrics around a little bit
// to match the semantics of the fair scheduler.
queueInfo.setCapacity((float) getFairShare().getMemory() /
scheduler.getClusterCapacity().getMemory());
queueInfo.setCapacity((float) getResourceUsage().getMemory() /
scheduler.getClusterCapacity().getMemory());
queueInfo.setChildQueues(new ArrayList<QueueInfo>());
queueInfo.setQueueState(QueueState.RUNNING);
return queueInfo;
}
@Override
public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
QueueUserACLInfo userAclInfo =
@ -294,9 +205,20 @@ public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
userAclInfo.setUserAcls(operations);
return Collections.singletonList(userAclInfo);
}
public long getLastTimeAtMinShare() {
return lastTimeAtMinShare;
}
@Override
public String getQueueName() {
return getName();
public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
this.lastTimeAtMinShare = lastTimeAtMinShare;
}
public long getLastTimeAtHalfFairShare() {
return lastTimeAtHalfFairShare;
}
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
}
}

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
public class FSParentQueue extends FSQueue {
private static final Log LOG = LogFactory.getLog(
FSParentQueue.class.getName());
private final List<FSQueue> childQueues =
new ArrayList<FSQueue>();
private final QueueManager queueMgr;
private Resource demand = Resources.createResource(0);
public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
FSParentQueue parent) {
super(name, queueMgr, scheduler, parent);
this.queueMgr = queueMgr;
}
public void addChildQueue(FSQueue child) {
childQueues.add(child);
}
@Override
public void recomputeFairShares() {
SchedulingAlgorithms.computeFairShares(childQueues, getFairShare());
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
childQueue.recomputeFairShares();
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
for (FSQueue child : childQueues) {
Resources.addTo(usage, child.getResourceUsage());
}
return usage;
}
@Override
public void updateDemand() {
// Compute demand by iterating through apps in the queue
// Limit demand to maxResources
Resource maxRes = queueMgr.getMaxResources(getName());
demand = Resources.createResource(0);
for (FSQueue childQueue : childQueues) {
childQueue.updateDemand();
Resource toAdd = childQueue.getDemand();
if (LOG.isDebugEnabled()) {
LOG.debug("Counting resource from " + childQueue.getName() + " " +
toAdd + "; Total resource consumption for " + getName() +
" now " + demand);
}
demand = Resources.add(demand, toAdd);
if (Resources.greaterThanOrEqual(demand, maxRes)) {
demand = maxRes;
break;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand +
"; the max is " + maxRes);
}
}
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
synchronized (this) {
if (getQueueAcls().get(acl).isUserAllowed(user)) {
return true;
}
}
if (parent != null) {
return parent.hasAccess(acl, user);
}
return false;
}
private synchronized QueueUserACLInfo getUserAclInfo(
UserGroupInformation user) {
QueueUserACLInfo userAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>();
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
operations.add(operation);
}
}
userAclInfo.setQueueName(getQueueName());
userAclInfo.setUserAcls(operations);
return userAclInfo;
}
@Override
public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
UserGroupInformation user) {
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
// Add queue acls
userAcls.add(getUserAclInfo(user));
// Add children queue acls
for (FSQueue child : childQueues) {
userAcls.addAll(child.getQueueUserAclInfo(user));
}
return userAcls;
}
@Override
public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
throw new IllegalStateException(
"Parent queue should not be assigned container");
}
@Override
public Collection<FSQueue> getChildQueues() {
return childQueues;
}
}

View File

@ -20,65 +20,112 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
/**
* A queue containing several applications.
*/
@Private
@Unstable
public class FSQueue {
/** Queue name. */
private String name;
/** Applications in this specific queue; does not include children queues' jobs. */
private Collection<FSSchedulerApp> applications =
new ArrayList<FSSchedulerApp>();
/** Scheduling mode for jobs inside the queue (fair or FIFO) */
private SchedulingMode schedulingMode;
private FairScheduler scheduler;
private FSQueueSchedulable queueSchedulable;
public FSQueue(FairScheduler scheduler, String name) {
public abstract class FSQueue extends Schedulable implements Queue {
private final String name;
private final QueueManager queueMgr;
private final FairScheduler scheduler;
private final QueueMetrics metrics;
protected final FSParentQueue parent;
protected final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
public FSQueue(String name, QueueManager queueMgr,
FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
this.queueSchedulable = new FSQueueSchedulable(scheduler, this);
this.queueMgr = queueMgr;
this.scheduler = scheduler;
this.metrics = QueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
this.parent = parent;
}
public Collection<FSSchedulerApp> getApplications() {
return applications;
}
public void addApp(FSSchedulerApp app) {
applications.add(app);
AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
app.setAppSchedulable(appSchedulable);
queueSchedulable.addApp(appSchedulable);
}
public void removeApp(FSSchedulerApp app) {
applications.remove(app);
queueSchedulable.removeApp(app);
}
public String getName() {
return name;
}
public SchedulingMode getSchedulingMode() {
return schedulingMode;
@Override
public String getQueueName() {
return name;
}
@Override
public double getWeight() {
return queueMgr.getQueueWeight(getName());
}
@Override
public Resource getMinShare() {
return queueMgr.getMinResources(getName());
}
public void setSchedulingMode(SchedulingMode schedulingMode) {
this.schedulingMode = schedulingMode;
@Override
public long getStartTime() {
return 0;
}
public FSQueueSchedulable getQueueSchedulable() {
return queueSchedulable;
@Override
public Priority getPriority() {
Priority p = recordFactory.newRecordInstance(Priority.class);
p.setPriority(1);
return p;
}
@Override
public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(getQueueName());
// TODO: we might change these queue metrics around a little bit
// to match the semantics of the fair scheduler.
queueInfo.setCapacity((float) getFairShare().getMemory() /
scheduler.getClusterCapacity().getMemory());
queueInfo.setCapacity((float) getResourceUsage().getMemory() /
scheduler.getClusterCapacity().getMemory());
ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
if (includeChildQueues) {
Collection<FSQueue> childQueues = getChildQueues();
for (FSQueue child : childQueues) {
childQueueInfos.add(child.getQueueInfo(recursive, recursive));
}
}
queueInfo.setChildQueues(childQueueInfos);
queueInfo.setQueueState(QueueState.RUNNING);
return queueInfo;
}
@Override
public Map<QueueACL, AccessControlList> getQueueAcls() {
Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
return new HashMap<QueueACL, AccessControlList>(acls);
}
@Override
public QueueMetrics getMetrics() {
return metrics;
}
/**
* Recomputes the fair shares for all queues and applications
* under this queue.
*/
public abstract void recomputeFairShares();
/**
* Gets the children of this queue, if any.
*/
public abstract Collection<FSQueue> getChildQueues();
}

View File

@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@ -75,6 +76,25 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
/**
* A scheduler that schedules resources between a set of queues. The scheduler
* keeps track of the resources used by each queue, and attempts to maintain
* fairness by scheduling tasks at queues whose allocations are farthest below
* an ideal fair distribution.
*
* The fair scheduler supports hierarchical queues. All queues descend from a
* queue named "root". Available resources are distributed among the children
* of the root queue in the typical fair scheduling fashion. Then, the children
* distribute the resources assigned to them to their children in the same
* fashion. Applications may only be scheduled on leaf queues. Queues can be
* specified as children of other queues by placing them as sub-elements of their
* parents in the fair scheduler configuration file.
*
* A queue's name starts with the names of its parents, with periods as
* separators. So a queue named "queue1" under the root named, would be
* referred to as "root.queue1", and a queue named "queue2" under a queue
* named "parent1" would be referred to as "root.parent1.queue2".
*/
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
@ -105,23 +125,22 @@ public class FairScheduler implements ResourceScheduler {
// Aggregate metrics
QueueMetrics rootMetrics;
//Time when we last updated preemption vars
// Time when we last updated preemption vars
protected long lastPreemptionUpdateTime;
//Time we last ran preemptTasksIfNecessary
// Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
// This stores per-application scheduling information, indexed by
// attempt ID's for fast lookup.
protected Map<ApplicationAttemptId, FSSchedulerApp> applications
= new HashMap<ApplicationAttemptId, FSSchedulerApp>();
protected Map<ApplicationAttemptId, FSSchedulerApp> applications =
new HashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId
private Map<NodeId, FSSchedulerNode> nodes =
private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
// Aggregate capacity of the cluster
private Resource clusterCapacity =
private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
// How often tasks are preempted (must be longer than a couple
@ -131,10 +150,11 @@ public class FairScheduler implements ResourceScheduler {
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
private FairSchedulerEventLog eventLog; // Machine-readable event log
protected boolean assignMultiple; // Allocate multiple containers per heartbeat
protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality
private FairSchedulerEventLog eventLog; // Machine-readable event log
protected boolean assignMultiple; // Allocate multiple containers per
// heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
public FairScheduler() {
@ -150,16 +170,8 @@ public QueueManager getQueueManager() {
return queueMgr;
}
public List<FSQueueSchedulable> getQueueSchedulables() {
List<FSQueueSchedulable> scheds = new ArrayList<FSQueueSchedulable>();
for (FSQueue queue: queueMgr.getQueues()) {
scheds.add(queue.getQueueSchedulable());
}
return scheds;
}
private RMContainer getRMContainer(ContainerId containerId) {
FSSchedulerApp application =
FSSchedulerApp application =
applications.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
@ -183,34 +195,24 @@ public void run() {
}
/**
* Recompute the internal variables used by the scheduler - per-job weights,
* fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job.
*/
* Recompute the internal variables used by the scheduler - per-job weights,
* fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job.
*/
protected synchronized void update() {
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
updateRunnability(); // Set job runnability based on user/queue limits
updatePreemptionVariables(); // Determine if any queues merit preemption
// Update demands of apps and queues
for (FSQueue queue: queueMgr.getQueues()) {
queue.getQueueSchedulable().updateDemand();
}
FSQueue rootQueue = queueMgr.getRootQueue();
// Compute fair shares based on updated demands
List<FSQueueSchedulable> queueScheds = getQueueSchedulables();
SchedulingAlgorithms.computeFairShares(
queueScheds, clusterCapacity);
// Recursively update demands for all queues
rootQueue.updateDemand();
// Update queue metrics for this queue
for (FSQueueSchedulable sched : queueScheds) {
sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
}
// Use the computed shares to assign shares within each queue
for (FSQueue queue: queueMgr.getQueues()) {
queue.getQueueSchedulable().redistributeShare();
}
rootQueue.setFairShare(clusterCapacity);
// Recursively compute fair shares for all queues
// and update metrics
rootQueue.recomputeFairShares();
// Update recorded capacity of root queue (child queues are updated
// when fair share is calculated).
@ -225,7 +227,7 @@ protected synchronized void update() {
private void updatePreemptionVariables() {
long now = clock.getTime();
lastPreemptionUpdateTime = now;
for (FSQueueSchedulable sched: getQueueSchedulables()) {
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
if (!isStarvedForMinShare(sched)) {
sched.setLastTimeAtMinShare(now);
}
@ -238,16 +240,16 @@ private void updatePreemptionVariables() {
/**
* Is a queue below its min share for the given task type?
*/
boolean isStarvedForMinShare(FSQueueSchedulable sched) {
boolean isStarvedForMinShare(FSLeafQueue sched) {
Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredShare);
}
/**
* Is a queue being starved for fair share for the given task type?
* This is defined as being below half its fair share.
* Is a queue being starved for fair share for the given task type? This is
* defined as being below half its fair share.
*/
boolean isStarvedForFairShare(FSQueueSchedulable sched) {
boolean isStarvedForFairShare(FSLeafQueue sched) {
Resource desiredFairShare = Resources.max(
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredFairShare);
@ -255,10 +257,10 @@ boolean isStarvedForFairShare(FSQueueSchedulable sched) {
/**
* Check for queues that need tasks preempted, either because they have been
* below their guaranteed share for minSharePreemptionTimeout or they
* have been below half their fair share for the fairSharePreemptionTimeout.
* If such queues exist, compute how many tasks of each type need to be
* preempted and then select the right ones using preemptTasks.
* below their guaranteed share for minSharePreemptionTimeout or they have
* been below half their fair share for the fairSharePreemptionTimeout. If
* such queues exist, compute how many tasks of each type need to be preempted
* and then select the right ones using preemptTasks.
*/
protected synchronized void preemptTasksIfNecessary() {
if (!preemptionEnabled) {
@ -273,35 +275,37 @@ protected synchronized void preemptTasksIfNecessary() {
Resource resToPreempt = Resources.none();
for (FSQueueSchedulable sched: getQueueSchedulables()) {
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
}
if (Resources.greaterThan(resToPreempt, Resources.none())) {
preemptResources(getQueueSchedulables(), resToPreempt);
preemptResources(queueMgr.getLeafQueues(), resToPreempt);
}
}
/**
* Preempt a quantity of resources from a list of QueueSchedulables.
* The policy for this is to pick apps from queues that are over their fair
* share, but make sure that no queue is placed below its fair share in the
* process. We further prioritize preemption by choosing containers with
* lowest priority to preempt.
* Preempt a quantity of resources from a list of QueueSchedulables. The
* policy for this is to pick apps from queues that are over their fair share,
* but make sure that no queue is placed below its fair share in the process.
* We further prioritize preemption by choosing containers with lowest
* priority to preempt.
*/
protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) {
protected void preemptResources(Collection<FSLeafQueue> scheds,
Resource toPreempt) {
if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
return;
}
Map<RMContainer, FSSchedulerApp> apps =
new HashMap<RMContainer, FSSchedulerApp>();
Map<RMContainer, FSQueueSchedulable> queues = new HashMap<RMContainer, FSQueueSchedulable>();
Map<RMContainer, FSLeafQueue> queues =
new HashMap<RMContainer, FSLeafQueue>();
// Collect running containers from over-scheduled queues
List<RMContainer> runningContainers = new ArrayList<RMContainer>();
for (FSQueueSchedulable sched: scheds) {
for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
for (AppSchedulable as: sched.getAppSchedulables()) {
for (AppSchedulable as : sched.getAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c);
apps.put(c, as.getApp());
@ -321,12 +325,12 @@ public int compare(RMContainer c1, RMContainer c2) {
// Scan down the sorted list of task statuses until we've killed enough
// tasks, making sure we don't kill too many from any queue
for (RMContainer container: runningContainers) {
FSQueueSchedulable sched = queues.get(container);
for (RMContainer container : runningContainers) {
FSLeafQueue sched = queues.get(container);
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
") from queue " + sched.getQueue().getName());
") from queue " + sched.getName());
ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
@ -348,12 +352,12 @@ public int compare(RMContainer c1, RMContainer c2) {
* If the queue has been below its min share for at least its preemption
* timeout, it should preempt the difference between its current share and
* this min share. If it has been below half its fair share for at least the
* fairSharePreemptionTimeout, it should preempt enough tasks to get up to
* its full fair share. If both conditions hold, we preempt the max of the
* two amounts (this shouldn't happen unless someone sets the timeouts to
* be identical for some reason).
* fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
* full fair share. If both conditions hold, we preempt the max of the two
* amounts (this shouldn't happen unless someone sets the timeouts to be
* identical for some reason).
*/
protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) {
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
String queue = sched.getName();
long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
@ -362,7 +366,7 @@ protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) {
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.min(sched.getMinShare(), sched.getDemand());
resDueToMinShare = Resources.max(Resources.none(),
Resources.subtract(target, sched.getResourceUsage()));
Resources.subtract(target, sched.getResourceUsage()));
}
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
Resource target = Resources.min(sched.getFairShare(), sched.getDemand());
@ -380,15 +384,15 @@ protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) {
}
/**
* This updates the runnability of all apps based on whether or not
* any users/queues have exceeded their capacity.
* This updates the runnability of all apps based on whether or not any
* users/queues have exceeded their capacity.
*/
private void updateRunnability() {
List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
// Start by marking everything as not runnable
for (FSQueue p: queueMgr.getQueues()) {
for (AppSchedulable a: p.getQueueSchedulable().getAppSchedulables()) {
for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
for (AppSchedulable a : leafQueue.getAppSchedulables()) {
a.setRunnable(false);
apps.add(a);
}
@ -400,7 +404,7 @@ private void updateRunnability() {
Map<String, Integer> userApps = new HashMap<String, Integer>();
Map<String, Integer> queueApps = new HashMap<String, Integer>();
for (AppSchedulable app: apps) {
for (AppSchedulable app : apps) {
String user = app.getApp().getUser();
String queue = app.getApp().getQueueName();
int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
@ -473,22 +477,25 @@ public FairSchedulerEventLog getEventLog() {
}
/**
* Add a new application to the scheduler, with a given id, queue name,
* and user. This will accept a new app even if the user or queue is above
* Add a new application to the scheduler, with a given id, queue name, and
* user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
protected synchronized void
addApplication(ApplicationAttemptId applicationAttemptId,
String queueName, String user) {
protected synchronized void addApplication(
ApplicationAttemptId applicationAttemptId, String queueName, String user) {
FSQueue queue = queueMgr.getQueue(queueName);
FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
if (queue == null) {
// queue is not an existing or createable leaf queue
queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user,
queue.getQueueSchedulable(), new ActiveUsersManager(getRootQueueMetrics()),
queue, new ActiveUsersManager(getRootQueueMetrics()),
rmContext);
// Inforce ACLs
// Enforce ACLs
UserGroupInformation userUgi;
try {
userUgi = UserGroupInformation.getCurrentUser();
@ -497,8 +504,8 @@ public FairSchedulerEventLog getEventLog() {
return;
}
List<QueueUserACLInfo> info = queue.getQueueSchedulable().getQueueUserAclInfo(
userUgi); // Always a signleton list
// Always a singleton list
List<QueueUserACLInfo> info = queue.getQueueUserAclInfo(userUgi);
if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) {
LOG.info("User " + userUgi.getUserName() +
" cannot submit" + " applications to queue " + queue.getName());
@ -506,14 +513,13 @@ public FairSchedulerEventLog getEventLog() {
}
queue.addApp(schedulerApp);
queue.getQueueSchedulable().getMetrics().submitApp(user,
applicationAttemptId.getAttemptId());
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
rootMetrics.submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp);
LOG.info("Application Submission: " + applicationAttemptId +
", user: " + user +
", user: "+ user +
", currently active: " + applications.size());
rmContext.getDispatcher().getEventHandler().handle(
@ -540,10 +546,10 @@ private synchronized void removeApplication(
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL);
RMContainerEventType.KILL);
}
// Release all reserved containers
// Release all reserved containers
for (RMContainer rmContainer : application.getReservedContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
@ -556,7 +562,8 @@ private synchronized void removeApplication(
application.stop(rmAppAttemptFinalState);
// Inform the queue
FSQueue queue = queueMgr.getQueue(application.getQueue().getQueueName());
FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
.getQueueName());
queue.removeApp(application);
// Remove from our data-structure
@ -658,11 +665,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
for (ContainerId releasedContainerId : release) {
RMContainer rmContainer = getRMContainer(releasedContainerId);
if (rmContainer == null) {
RMAuditLogger.logFailure(application.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "FairScheduler",
"Trying to release container not owned by app or with invalid id",
application.getApplicationId(), releasedContainerId);
RMAuditLogger.logFailure(application.getUser(),
AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "FairScheduler",
"Trying to release container not owned by app or with invalid id",
application.getApplicationId(), releasedContainerId);
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
@ -675,8 +682,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: pre-update" +
" applicationAttemptId=" + appAttemptId +
" application=" + application.getApplicationId());
" applicationAttemptId=" + appAttemptId +
" application=" + application.getApplicationId());
}
application.showRequests();
@ -689,19 +696,17 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
if (LOG.isDebugEnabled()) {
LOG.debug("allocate:" +
" applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size());
" applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size());
}
return new Allocation(
application.pullNewlyAllocatedContainers(),
return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom());
}
}
/**
* Process a container which has launched on a node, as reported by the
* node.
* Process a container which has launched on a node, as reported by the node.
*/
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
@ -757,20 +762,20 @@ private synchronized void nodeUpdate(RMNode nm,
LOG.info("Trying to fulfill reservation for application " +
reservedApplication.getApplicationId() + " on node: " + nm);
FSQueue queue = queueMgr.getQueue(reservedApplication.getQueueName());
queue.getQueueSchedulable().assignContainer(node, true);
FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName());
queue.assignContainer(node, true);
}
// Otherwise, schedule at queue which is furthest below fair share
else {
int assignedContainers = 0;
while (true) {
// At most one task is scheduled each iteration of this loop
List<FSQueueSchedulable> scheds = getQueueSchedulables();
List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
queueMgr.getLeafQueues());
Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
boolean assignedContainer = false;
for (FSQueueSchedulable sched : scheds) {
for (FSLeafQueue sched : scheds) {
Resource assigned = sched.assignContainer(node, false);
if (Resources.greaterThan(assigned, Resources.none())) {
eventLog.log("ASSIGN", nm.getHostName(), assigned);
@ -813,7 +818,7 @@ public QueueMetrics getRootQueueMetrics() {
@Override
public void handle(SchedulerEvent event) {
switch(event.getType()) {
switch (event.getType()) {
case NODE_ADDED:
if (!(event instanceof NodeAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
@ -832,8 +837,7 @@ public void handle(SchedulerEvent event) {
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeUpdateSchedulerEvent nodeUpdatedEvent =
(NodeUpdateSchedulerEvent)event;
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
@ -842,7 +846,7 @@ public void handle(SchedulerEvent event) {
if (!(event instanceof AppAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
String queue = appAddedEvent.getQueue();
// Potentially set queue to username if configured to do so
@ -867,7 +871,7 @@ public void handle(SchedulerEvent event) {
throw new RuntimeException("Unexpected event type: " + event);
}
ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event;
(ContainerExpiredSchedulerEvent)event;
ContainerId containerId = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus(
@ -886,8 +890,8 @@ public void recover(RMState state) throws Exception {
}
@Override
public synchronized void
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
throws IOException {
if (!initialized) {
this.conf = new FairSchedulerConfiguration(conf);
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
@ -909,11 +913,10 @@ public void recover(RMState state) throws Exception {
try {
queueMgr.initialize();
}
catch (Exception e) {
} catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
Thread updateThread = new Thread(new UpdateThread());
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
@ -925,10 +928,9 @@ public void recover(RMState state) throws Exception {
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
preemptionEnabled = this.conf.getPreemptionEnabled();
try {
queueMgr.reloadAllocs();
queueMgr.reloadAllocs();
}
catch (Exception e) {
} catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e);
}
}
@ -940,8 +942,8 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues,
if (!queueMgr.exists(queueName)) {
return null;
}
return queueMgr.getQueue(queueName).getQueueSchedulable().getQueueInfo(
includeChildQueues, recursive);
return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
recursive);
}
@Override
@ -953,12 +955,7 @@ public List<QueueUserACLInfo> getQueueUserAclInfo() {
return new ArrayList<QueueUserACLInfo>();
}
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
for (FSQueue queue : queueMgr.getQueues()) {
userAcls.addAll(queue.getQueueSchedulable().getQueueUserAclInfo(user));
}
return userAcls;
return queueMgr.getRootQueue().getQueueUserAclInfo(user);
}
@Override

View File

@ -27,6 +27,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@ -52,6 +53,7 @@
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
*
*/
@Private
@Unstable
@ -59,6 +61,8 @@ public class QueueManager {
public static final Log LOG = LogFactory.getLog(
QueueManager.class.getName());
public static final String ROOT_QUEUE = "root";
/** Time to wait between checks of the allocation file */
public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
@ -76,7 +80,10 @@ public class QueueManager {
// used) or a String to specify an absolute path (if
// mapred.fairscheduler.allocation.file is used).
private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private final Collection<FSLeafQueue> leafQueues =
new CopyOnWriteArrayList<FSLeafQueue>();
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private FSParentQueue rootQueue;
private volatile QueueManagerInfo info = new QueueManagerInfo();
@ -87,10 +94,17 @@ public class QueueManager {
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
public FSParentQueue getRootQueue() {
return rootQueue;
}
public void initialize() throws IOException, SAXException,
AllocationConfigurationException, ParserConfigurationException {
FairSchedulerConfiguration conf = scheduler.getConf();
rootQueue = new FSParentQueue("root", this, scheduler, null);
queues.put(rootQueue.getName(), rootQueue);
this.allocFile = conf.getAllocationFile();
if (allocFile == null) {
// No allocation file specified in jobconf. Use the default allocation
@ -106,21 +120,106 @@ public void initialize() throws IOException, SAXException,
lastSuccessfulReload = scheduler.getClock().getTime();
lastReloadAttempt = scheduler.getClock().getTime();
// Create the default queue
getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
/**
* Get a queue by name, creating it if necessary
* Get a queue by name, creating it if necessary. If the queue
* is not or can not be a leaf queue, i.e. it already exists as a parent queue,
* or one of the parents in its name is already a leaf queue, null is returned.
*
* The root part of the name is optional, so a queue underneath the root
* named "queue1" could be referred to as just "queue1", and a queue named
* "queue2" underneath a parent named "parent1" that is underneath the root
* could be referred to as just "parent1.queue2".
*/
public FSQueue getQueue(String name) {
public FSLeafQueue getLeafQueue(String name) {
if (!name.startsWith(ROOT_QUEUE + ".")) {
name = ROOT_QUEUE + "." + name;
}
synchronized (queues) {
FSQueue queue = queues.get(name);
if (queue == null) {
queue = new FSQueue(scheduler, name);
queue.setSchedulingMode(info.defaultSchedulingMode);
queues.put(name, queue);
FSLeafQueue leafQueue = createLeafQueue(name);
if (leafQueue == null) {
return null;
}
leafQueue.setSchedulingMode(info.defaultSchedulingMode);
queue = leafQueue;
} else if (queue instanceof FSParentQueue) {
return null;
}
return queue;
return (FSLeafQueue)queue;
}
}
/**
* Creates a leaf queue and places it in the tree. Creates any
* parents that don't already exist.
*
* @return
* the created queue, if successful. null if not allowed (one of the parent
* queues in the queue name is already a leaf queue)
*/
private FSLeafQueue createLeafQueue(String name) {
List<String> newQueueNames = new ArrayList<String>();
newQueueNames.add(name);
int sepIndex = name.length();
FSParentQueue parent = null;
// Move up the queue tree until we reach one that exists.
while (sepIndex != -1) {
sepIndex = name.lastIndexOf('.', sepIndex-1);
FSQueue queue;
String curName = null;
curName = name.substring(0, sepIndex);
queue = queues.get(curName);
if (queue == null) {
newQueueNames.add(curName);
} else {
if (queue instanceof FSParentQueue) {
parent = (FSParentQueue)queue;
break;
} else {
return null;
}
}
}
// At this point, parent refers to the deepest existing parent of the
// queue to create.
// Now that we know everything worked out, make all the queues
// and add them to the map.
FSLeafQueue leafQueue = null;
for (int i = newQueueNames.size()-1; i >= 0; i--) {
String queueName = newQueueNames.get(i);
if (i == 0) {
// First name added was the leaf queue
leafQueue = new FSLeafQueue(name, this, scheduler, parent);
parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue);
} else {
FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent);
parent.addChildQueue(newParent);
queues.put(newParent.getName(), newParent);
parent = newParent;
}
}
return leafQueue;
}
/**
* Gets a queue by name.
*/
public FSQueue getQueue(String name) {
if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
name = ROOT_QUEUE + "." + name;
}
synchronized (queues) {
return queues.get(name);
}
}
@ -136,8 +235,8 @@ public boolean exists(String name) {
/**
* Get the queue for a given AppSchedulable.
*/
public FSQueue getQueueForApp(AppSchedulable app) {
return getQueue(app.getApp().getQueueName());
public FSLeafQueue getQueueForApp(AppSchedulable app) {
return getLeafQueue(app.getApp().getQueueName());
}
/**
@ -237,54 +336,9 @@ public void reloadAllocs() throws IOException, ParserConfigurationException,
Element element = (Element)node;
if ("queue".equals(element.getTagName()) ||
"pool".equals(element.getTagName())) {
String queueName = element.getAttribute("name");
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
queueNamesInAllocFile.add(queueName);
NodeList fields = element.getChildNodes();
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
minQueueResources.put(queueName, Resources.createResource(val));
} else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
maxQueueResources.put(queueName, Resources.createResource(val));
} else if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
queueWeights.put(queueName, val);
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(queueName, val);
} else if ("schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
queueModes.put(queueName, parseSchedulingMode(text));
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
} else if ("aclAdministerApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
}
}
queueAcls.put(queueName, acls);
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
&& Resources.lessThan(maxQueueResources.get(queueName),
minQueueResources.get(queueName))) {
LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
}
loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
queueAcls, queueNamesInAllocFile);
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
NodeList fields = element.getChildNodes();
@ -331,7 +385,7 @@ else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
for (String name: queueNamesInAllocFile) {
FSQueue queue = getQueue(name);
FSLeafQueue queue = getLeafQueue(name);
if (queueModes.containsKey(name)) {
queue.setSchedulingMode(queueModes.get(name));
} else {
@ -340,6 +394,75 @@ else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
}
}
}
/**
* Loads a queue from a queue element in the configuration file
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
throws AllocationConfigurationException {
String queueName = parentName + "." + element.getAttribute("name");
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
NodeList fields = element.getChildNodes();
boolean isLeaf = true;
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
minQueueResources.put(queueName, Resources.createResource(val));
} else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
maxQueueResources.put(queueName, Resources.createResource(val));
} else if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
queueWeights.put(queueName, val);
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(queueName, val);
} else if ("schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
queueModes.put(queueName, parseSchedulingMode(text));
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
} else if ("aclAdministerApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps,
userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
queueAcls, queueNamesInAllocFile);
isLeaf = false;
}
}
if (isLeaf) {
queueNamesInAllocFile.add(queueName);
}
queueAcls.put(queueName, acls);
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
&& Resources.lessThan(maxQueueResources.get(queueName),
minQueueResources.get(queueName))) {
LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
}
}
private SchedulingMode parseSchedulingMode(String text)
throws AllocationConfigurationException {
@ -384,9 +507,9 @@ public Resource getMaxResources(String queueName) {
/**
* Get a collection of all queues
*/
public Collection<FSQueue> getQueues() {
public Collection<FSLeafQueue> getLeafQueues() {
synchronized (queues) {
return new ArrayList<FSQueue>(queues.values());
return leafQueues;
}
}

View File

@ -91,12 +91,6 @@ abstract class Schedulable {
/** Refresh the Schedulable's demand and those of its children if any. */
public abstract void updateDemand();
/**
* Distribute the fair share assigned to this Schedulable among its
* children (used in queues where the internal scheduler is fair sharing).
*/
public abstract void redistributeShare();
/**
* Assign a container on this node if possible, and return the amount of
* resources assigned. If {@code reserved} is true, it means a reservation

View File

@ -23,7 +23,7 @@
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
public class FairSchedulerInfo {
@ -32,9 +32,9 @@ public class FairSchedulerInfo {
public FairSchedulerInfo(FairScheduler fs) {
scheduler = fs;
Collection<FSQueue> queues = fs.getQueueManager().getQueues();
Collection<FSLeafQueue> queues = fs.getQueueManager().getLeafQueues();
queueInfos = new ArrayList<FairSchedulerQueueInfo>();
for (FSQueue queue : queues) {
for (FSLeafQueue queue : queues) {
queueInfos.add(new FairSchedulerQueueInfo(queue, fs));
}
}

View File

@ -22,9 +22,8 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueSchedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
@ -49,17 +48,16 @@ public class FairSchedulerQueueInfo {
private String queueName;
public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
Collection<FSSchedulerApp> apps = queue.getApplications();
for (FSSchedulerApp app : apps) {
if (app.isPending()) {
public FairSchedulerQueueInfo(FSLeafQueue queue, FairScheduler scheduler) {
Collection<AppSchedulable> apps = queue.getAppSchedulables();
for (AppSchedulable app : apps) {
if (app.getApp().isPending()) {
numPendingApps++;
} else {
numActiveApps++;
}
}
FSQueueSchedulable schedulable = queue.getQueueSchedulable();
QueueManager manager = scheduler.getQueueManager();
queueName = queue.getName();
@ -67,11 +65,11 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
Resource clusterMax = scheduler.getClusterCapacity();
clusterMaxMem = clusterMax.getMemory();
usedResources = schedulable.getResourceUsage();
usedResources = queue.getResourceUsage();
fractionUsed = (float)usedResources.getMemory() / clusterMaxMem;
fairShare = schedulable.getFairShare().getMemory();
minResources = schedulable.getMinShare();
fairShare = queue.getFairShare().getMemory();
minResources = queue.getMinShare();
minShare = minResources.getMemory();
maxResources = scheduler.getQueueManager().getMaxResources(queueName);
if (maxResources.getMemory() > clusterMaxMem) {

View File

@ -107,9 +107,6 @@ public Resource getMinShare() {
return minShare;
}
@Override
public void redistributeShare() {}
@Override
public void updateDemand() {}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSLeafQueue {
private FSLeafQueue schedulable = null;
private Resource maxResource = Resources.createResource(10);
@Before
public void setup() throws IOException {
FairScheduler scheduler = new FairScheduler();
Configuration conf = createConfiguration();
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
RMStateStore store = StoreFactory.getStore(conf);
ResourceManager resourceManager = new ResourceManager(store);
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
String queueName = "root.queue1";
QueueManager mockMgr = mock(QueueManager.class);
when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource);
schedulable = new FSLeafQueue(queueName, mockMgr, scheduler, null);
}
@Test
public void testUpdateDemand() {
AppSchedulable app = mock(AppSchedulable.class);
Mockito.when(app.getDemand()).thenReturn(maxResource);
schedulable.addAppSchedulable(app);
schedulable.addAppSchedulable(app);
schedulable.updateDemand();
assertTrue("Demand is greater than max allowed ",
Resources.equals(schedulable.getDemand(), maxResource));
}
private Configuration createConfiguration() {
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
return conf;
}
}

View File

@ -1,42 +0,0 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSQueueSchedulable {
private FSQueueSchedulable schedulable = null;
private Resource maxResource = Resources.createResource(10);
@Before
public void setup() {
String queueName = "testFSQueue";
FSQueue mockQueue = mock(FSQueue.class);
when(mockQueue.getName()).thenReturn(queueName);
QueueManager mockMgr = mock(QueueManager.class);
when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource);
schedulable = new FSQueueSchedulable(null, mockQueue, mockMgr, null, 0, 0);
}
@Test
public void testUpdateDemand() {
AppSchedulable app = mock(AppSchedulable.class);
Mockito.when(app.getDemand()).thenReturn(maxResource);
schedulable.addApp(app);
schedulable.addApp(app);
schedulable.updateDemand();
assertTrue("Demand is greater than max allowed ",
Resources.equals(schedulable.getDemand(), maxResource));
}
}

View File

@ -31,6 +31,10 @@
import java.util.List;
import java.util.Map;
import javax.xml.parsers.ParserConfigurationException;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Clock;
@ -61,6 +65,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.xml.sax.SAXException;
public class TestFairScheduler {
@ -195,15 +200,64 @@ public void testSimpleFairShareCalculation() {
scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
for (FSQueue p : queues) {
if (p.getName() != "default") {
assertEquals(5120, p.getQueueSchedulable().getFairShare().getMemory());
for (FSLeafQueue p : queues) {
if (!p.getName().equals("root.default")) {
assertEquals(5120, p.getFairShare().getMemory());
}
}
}
@Test
public void testSimpleHierarchicalFairShareCalculation() {
// Add one big node (only care about aggregate capacity)
int capacity = 10 * 24;
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Have two queues which want entire cluster capacity
createSchedulingRequest(10 * 1024, "queue1", "user1");
createSchedulingRequest(10 * 1024, "parent.queue2", "user1");
createSchedulingRequest(10 * 1024, "parent.queue3", "user1");
scheduler.update();
QueueManager queueManager = scheduler.getQueueManager();
Collection<FSLeafQueue> queues = queueManager.getLeafQueues();
assertEquals(4, queues.size());
FSLeafQueue queue1 = queueManager.getLeafQueue("queue1");
FSLeafQueue queue2 = queueManager.getLeafQueue("parent.queue2");
FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3");
assertEquals(capacity / 2, queue1.getFairShare().getMemory());
assertEquals(capacity / 4, queue2.getFairShare().getMemory());
assertEquals(capacity / 4, queue3.getFairShare().getMemory());
}
@Test
public void testHierarchicalQueuesSimilarParents() {
QueueManager queueManager = scheduler.getQueueManager();
FSLeafQueue leafQueue = queueManager.getLeafQueue("parent.child");
Assert.assertEquals(2, queueManager.getLeafQueues().size());
Assert.assertNotNull(leafQueue);
Assert.assertEquals("root.parent.child", leafQueue.getName());
FSLeafQueue leafQueue2 = queueManager.getLeafQueue("parent");
Assert.assertNull(leafQueue2);
Assert.assertEquals(2, queueManager.getLeafQueues().size());
FSLeafQueue leafQueue3 = queueManager.getLeafQueue("parent.child.grandchild");
Assert.assertNull(leafQueue3);
Assert.assertEquals(2, queueManager.getLeafQueues().size());
FSLeafQueue leafQueue4 = queueManager.getLeafQueue("parent.sister");
Assert.assertNotNull(leafQueue4);
Assert.assertEquals("root.parent.sister", leafQueue4.getName());
Assert.assertEquals(3, queueManager.getLeafQueues().size());
}
@Test
public void testSimpleContainerAllocation() {
@ -228,14 +282,14 @@ public void testSimpleContainerAllocation() {
// Asked for less than min_allocation.
assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
scheduler.getQueueManager().getQueue("queue1").
getQueueSchedulable().getResourceUsage().getMemory());
getResourceUsage().getMemory());
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
scheduler.handle(updateEvent2);
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
getQueueSchedulable().getResourceUsage().getMemory());
getResourceUsage().getMemory());
}
@Test
@ -254,7 +308,7 @@ public void testSimpleContainerReservation() throws InterruptedException {
// Make sure queue 1 is allocated app capacity
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
getQueueSchedulable().getResourceUsage().getMemory());
getResourceUsage().getMemory());
// Now queue 2 requests likewise
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
@ -263,7 +317,7 @@ public void testSimpleContainerReservation() throws InterruptedException {
// Make sure queue 2 is waiting with a reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
getQueueSchedulable().getResourceUsage().getMemory());
getResourceUsage().getMemory());
assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
// Now another node checks in with capacity
@ -276,7 +330,7 @@ public void testSimpleContainerReservation() throws InterruptedException {
// Make sure this goes to queue 2
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
getQueueSchedulable().getResourceUsage().getMemory());
getResourceUsage().getMemory());
// The old reservation should still be there...
assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
@ -294,17 +348,22 @@ public void testUserAsDefaultQueue() throws Exception {
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
createAppAttemptId(1, 1), "default", "user1");
scheduler.handle(appAddedEvent);
assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size());
assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size());
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1")
.getAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default")
.getAppSchedulables().size());
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
scheduler.reinitialize(conf, resourceManager.getRMContext());
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
createAppAttemptId(2, 1), "default", "user2");
scheduler.handle(appAddedEvent2);
assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size());
assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size());
assertEquals(0, scheduler.getQueueManager().getQueue("user2").getApplications().size());
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1")
.getAppSchedulables().size());
assertEquals(1, scheduler.getQueueManager().getLeafQueue("default")
.getAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user2")
.getAppSchedulables().size());
}
@Test
@ -338,18 +397,17 @@ public void testFairShareWithMinAlloc() throws Exception {
scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
for (FSQueue p : queues) {
if (p.getName().equals("queueA")) {
assertEquals(1024, p.getQueueSchedulable().getFairShare().getMemory());
for (FSLeafQueue p : queues) {
if (p.getName().equals("root.queueA")) {
assertEquals(1024, p.getFairShare().getMemory());
}
else if (p.getName().equals("queueB")) {
assertEquals(2048, p.getQueueSchedulable().getFairShare().getMemory());
else if (p.getName().equals("root.queueB")) {
assertEquals(2048, p.getFairShare().getMemory());
}
}
}
/**
@ -358,11 +416,11 @@ else if (p.getName().equals("queueB")) {
@Test
public void testQueueDemandCalculation() throws Exception {
ApplicationAttemptId id11 = createAppAttemptId(1, 1);
scheduler.addApplication(id11, "queue1", "user1");
scheduler.addApplication(id11, "root.queue1", "user1");
ApplicationAttemptId id21 = createAppAttemptId(2, 1);
scheduler.addApplication(id21, "queue2", "user1");
scheduler.addApplication(id21, "root.queue2", "user1");
ApplicationAttemptId id22 = createAppAttemptId(2, 2);
scheduler.addApplication(id22, "queue2", "user1");
scheduler.addApplication(id22, "root.queue2", "user1");
int minReqSize = YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB;
@ -388,10 +446,10 @@ public void testQueueDemandCalculation() throws Exception {
scheduler.update();
assertEquals(2 * minReqSize, scheduler.getQueueManager().getQueue("queue1")
.getQueueSchedulable().getDemand().getMemory());
assertEquals(2 * minReqSize, scheduler.getQueueManager().getQueue("root.queue1")
.getDemand().getMemory());
assertEquals(2 * minReqSize + 2 * minReqSize + (2 * minReqSize), scheduler
.getQueueManager().getQueue("queue2").getQueueSchedulable().getDemand()
.getQueueManager().getQueue("root.queue2").getDemand()
.getMemory());
}
@ -402,10 +460,11 @@ public void testAppAdditionAndRemoval() throws Exception {
scheduler.handle(appAddedEvent1);
// Scheduler should have two queues (the default and the one created for user1)
assertEquals(2, scheduler.getQueueManager().getQueues().size());
assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
// That queue should have one app
assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size());
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1")
.getAppSchedulables().size());
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
@ -414,7 +473,8 @@ public void testAppAdditionAndRemoval() throws Exception {
scheduler.handle(appRemovedEvent1);
// Queue should have no apps
assertEquals(0, scheduler.getQueueManager().getQueue("user1").getApplications().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1")
.getAppSchedulables().size());
}
@Test
@ -466,60 +526,98 @@ public void testAllocationFileParsing() throws Exception {
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue
assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue
assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(1024),
queueManager.getMinResources("queueA"));
queueManager.getMinResources("root.queueA"));
assertEquals(Resources.createResource(2048),
queueManager.getMinResources("queueB"));
queueManager.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueC"));
queueManager.getMinResources("root.queueC"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueD"));
queueManager.getMinResources("root.queueD"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueE"));
queueManager.getMinResources("root.queueE"));
assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueManager.getQueueMaxApps("queueA"));
assertEquals(15, queueManager.getQueueMaxApps("queueB"));
assertEquals(15, queueManager.getQueueMaxApps("queueC"));
assertEquals(3, queueManager.getQueueMaxApps("queueD"));
assertEquals(15, queueManager.getQueueMaxApps("queueE"));
assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueManager.getQueueMaxApps("root.queueA"));
assertEquals(15, queueManager.getQueueMaxApps("root.queueB"));
assertEquals(15, queueManager.getQueueMaxApps("root.queueC"));
assertEquals(3, queueManager.getQueueMaxApps("root.queueD"));
assertEquals(15, queueManager.getQueueMaxApps("root.queueE"));
assertEquals(10, queueManager.getUserMaxApps("user1"));
assertEquals(5, queueManager.getUserMaxApps("user2"));
// Unspecified queues should get default ACL
Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA");
Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("root.queueA");
assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE));
assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString());
assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS));
assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
// Queue B ACL
Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB");
Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("root.queueB");
assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE));
assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString());
// Queue c ACL
Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC");
Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("root.queueC");
assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS));
assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueManager.getMinSharePreemptionTimeout(
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
}
@Test
public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException,
AllocationConfigurationException, ParserConfigurationException {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>2048</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("<queue name=\"queueC\">");
out.println("<minResources>2048</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
out.println("<minResources>2048</minResources>");
out.println("</queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
Collection<FSLeafQueue> leafQueues = queueManager.getLeafQueues();
Assert.assertEquals(4, leafQueues.size());
Assert.assertNotNull(queueManager.getLeafQueue("queueA"));
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC"));
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD"));
Assert.assertNotNull(queueManager.getLeafQueue("default"));
// Make sure querying for queues didn't create any new ones:
Assert.assertEquals(4, leafQueues.size());
}
@Test
public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
Configuration conf = createConfiguration();
@ -569,29 +667,29 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue
assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue
assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME));
queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(1024),
queueManager.getMinResources("queueA"));
queueManager.getMinResources("root.queueA"));
assertEquals(Resources.createResource(2048),
queueManager.getMinResources("queueB"));
queueManager.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueC"));
queueManager.getMinResources("root.queueC"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueD"));
queueManager.getMinResources("root.queueD"));
assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueE"));
queueManager.getMinResources("root.queueE"));
assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueManager.getQueueMaxApps("queueA"));
assertEquals(15, queueManager.getQueueMaxApps("queueB"));
assertEquals(15, queueManager.getQueueMaxApps("queueC"));
assertEquals(3, queueManager.getQueueMaxApps("queueD"));
assertEquals(15, queueManager.getQueueMaxApps("queueE"));
assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueManager.getQueueMaxApps("root.queueA"));
assertEquals(15, queueManager.getQueueMaxApps("root.queueB"));
assertEquals(15, queueManager.getQueueMaxApps("root.queueC"));
assertEquals(3, queueManager.getQueueMaxApps("root.queueD"));
assertEquals(15, queueManager.getQueueMaxApps("root.queueE"));
assertEquals(10, queueManager.getUserMaxApps("user1"));
assertEquals(5, queueManager.getUserMaxApps("user2"));
@ -603,23 +701,23 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
// Queue B ACL
Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB");
Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("root.queueB");
assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE));
assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString());
// Queue c ACL
Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC");
Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("root.queueC");
assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS));
assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueManager.getMinSharePreemptionTimeout(
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA"));
assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
}
@ -659,25 +757,25 @@ public void testIsStarvedForMinShare() throws Exception {
// Queue B arrives and wants 1 * 1024
createSchedulingRequest(1 * 1024, "queueB", "user1");
scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
// Queue A should be above min share, B below.
for (FSQueue p : queues) {
if (p.getName().equals("queueA")) {
assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
for (FSLeafQueue p : queues) {
if (p.getName().equals("root.queueA")) {
assertEquals(false, scheduler.isStarvedForMinShare(p));
}
else if (p.getName().equals("queueB")) {
assertEquals(true, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
else if (p.getName().equals("root.queueB")) {
assertEquals(true, scheduler.isStarvedForMinShare(p));
}
}
// Node checks in again, should allocate for B
scheduler.handle(nodeEvent2);
// Now B should have min share ( = demand here)
for (FSQueue p : queues) {
if (p.getName().equals("queueB")) {
assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable()));
for (FSLeafQueue p : queues) {
if (p.getName().equals("root.queueB")) {
assertEquals(false, scheduler.isStarvedForMinShare(p));
}
}
}
@ -718,16 +816,16 @@ public void testIsStarvedForFairShare() throws Exception {
// Queue B arrives and wants 1 * 1024
createSchedulingRequest(1 * 1024, "queueB", "user1");
scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
// Queue A should be above fair share, B below.
for (FSQueue p : queues) {
if (p.getName().equals("queueA")) {
assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
for (FSLeafQueue p : queues) {
if (p.getName().equals("root.queueA")) {
assertEquals(false, scheduler.isStarvedForFairShare(p));
}
else if (p.getName().equals("queueB")) {
assertEquals(true, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
else if (p.getName().equals("root.queueB")) {
assertEquals(true, scheduler.isStarvedForFairShare(p));
}
}
@ -735,9 +833,9 @@ else if (p.getName().equals("queueB")) {
scheduler.handle(nodeEvent2);
// B should not be starved for fair share, since entire demand is
// satisfied.
for (FSQueue p : queues) {
if (p.getName().equals("queueB")) {
assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable()));
for (FSLeafQueue p : queues) {
if (p.getName().equals("root.queueB")) {
assertEquals(false, scheduler.isStarvedForFairShare(p));
}
}
}
@ -845,7 +943,7 @@ public void testChoiceOfPreemptedContainers() throws Exception {
// We should be able to claw back one container from A and B each.
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueSchedulables(),
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
@ -856,7 +954,7 @@ public void testChoiceOfPreemptedContainers() throws Exception {
// We should be able to claw back another container from A and B each.
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueSchedulables(),
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
@ -866,7 +964,7 @@ public void testChoiceOfPreemptedContainers() throws Exception {
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
// Now A and B are below fair share, so preemption shouldn't do anything
scheduler.preemptResources(scheduler.getQueueSchedulables(),
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
@ -977,10 +1075,10 @@ public void testPreemptionDecision() throws Exception {
scheduler.update();
FSQueueSchedulable schedC =
scheduler.getQueueManager().getQueue("queueC").getQueueSchedulable();
FSQueueSchedulable schedD =
scheduler.getQueueManager().getQueue("queueD").getQueueSchedulable();
FSLeafQueue schedC =
scheduler.getQueueManager().getLeafQueue("queueC");
FSLeafQueue schedD =
scheduler.getQueueManager().getLeafQueue("queueD");
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(schedC, clock.getTime())));

View File

@ -53,6 +53,22 @@ Hadoop MapReduce Next Generation - Fair Scheduler
capacity between the running apps. queues can also be given weights to share
the cluster non-proportionally in the config file.
The fair scheduler supports hierarchical queues. All queues descend from a
queue named "root". Available resources are distributed among the children
of the root queue in the typical fair scheduling fashion. Then, the children
distribute the resources assigned to them to their children in the same
fashion. Applications may only be scheduled on leaf queues. Queues can be
specified as children of other queues by placing them as sub-elements of
their parents in the fair scheduler configuration file.
A queue's name starts with the names of its parents, with periods as
separators. So a queue named "queue1" under the root named, would be
referred to as "root.queue1", and a queue named "queue2" under a queue
named "parent1" would be referred to as "root.parent1.queue2". When
referring to queues, the root part of the name is optional, so queue1 could
be referred to as just "queue1", and a queue2 could be referred to as just
"parent1.queue2".
In addition to providing fair sharing, the Fair Scheduler allows assigning
guaranteed minimum shares to queues, which is useful for ensuring that
certain users, groups or production applications always get sufficient
@ -163,11 +179,14 @@ Allocation file format
<?xml version="1.0"?>
<allocations>
<queue name="sample_queue">
<minResources>100000</minResources>
<maxResources>900000</maxResources>
<minResources>10000</minResources>
<maxResources>90000</maxResources>
<maxRunningApps>50</maxRunningApps>
<weight>2.0</weight>
<schedulingMode>fair</schedulingMode>
<queue name="sample_sub_queue">
<minResources>5000</minResources>
</queue>
</queue>
<user name="sample_user">
<maxRunningApps>30</maxRunningApps>