YARN-187. Add hierarchical queues to the fair scheduler. Contributed by Sandy Ryza.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1415592 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Thomas White 2012-11-30 12:03:25 +00:00
parent 6b08d7de5f
commit ae6f1123f5
15 changed files with 974 additions and 581 deletions

View File

@ -117,6 +117,8 @@ Release 2.0.3-alpha - Unreleased
YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy) YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy)
YARN-187. Add hierarchical queues to the fair scheduler. (Sandy Ryza via tomwhite)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -50,10 +50,10 @@ public class AppSchedulable extends Schedulable {
private long startTime; private long startTime;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(AppSchedulable.class); private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
private FSQueue queue; private FSLeafQueue queue;
private RMContainerTokenSecretManager containerTokenSecretManager; private RMContainerTokenSecretManager containerTokenSecretManager;
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSQueue queue) { public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler; this.scheduler = scheduler;
this.app = app; this.app = app;
this.startTime = System.currentTimeMillis(); this.startTime = System.currentTimeMillis();
@ -96,9 +96,6 @@ public class AppSchedulable extends Schedulable {
return startTime; return startTime;
} }
@Override
public void redistributeShare() {}
@Override @Override
public Resource getResourceUsage() { public Resource getResourceUsage() {
return app.getCurrentConsumption(); return app.getCurrentConsumption();
@ -114,7 +111,7 @@ public class AppSchedulable extends Schedulable {
* Get metrics reference from containing queue. * Get metrics reference from containing queue.
*/ */
public QueueMetrics getMetrics() { public QueueMetrics getMetrics() {
return queue.getQueueSchedulable().getMetrics(); return queue.getMetrics();
} }
@Override @Override

View File

@ -22,73 +22,57 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
@Private public class FSLeafQueue extends FSQueue {
@Unstable private static final Log LOG = LogFactory.getLog(
public class FSQueueSchedulable extends Schedulable implements Queue { FSLeafQueue.class.getName());
public static final Log LOG = LogFactory.getLog(
FSQueueSchedulable.class.getName()); private final List<AppSchedulable> appScheds =
new ArrayList<AppSchedulable>();
private FairScheduler scheduler; /** Scheduling mode for jobs inside the queue (fair or FIFO) */
private FSQueue queue; private SchedulingMode schedulingMode;
private QueueManager queueMgr;
private List<AppSchedulable> appScheds = new LinkedList<AppSchedulable>(); private final FairScheduler scheduler;
private final QueueManager queueMgr;
private Resource demand = Resources.createResource(0); private Resource demand = Resources.createResource(0);
private QueueMetrics metrics;
private RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
// Variables used for preemption // Variables used for preemption
long lastTimeAtMinShare; private long lastTimeAtMinShare;
long lastTimeAtHalfFairShare; private long lastTimeAtHalfFairShare;
// Constructor for tests public FSLeafQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
protected FSQueueSchedulable(FairScheduler scheduler, FSQueue fsQueue, FSParentQueue parent) {
QueueManager qMgr, QueueMetrics metrics, long minShare, long fairShare) { super(name, queueMgr, scheduler, parent);
this.scheduler = scheduler; this.scheduler = scheduler;
this.queueMgr = qMgr; this.queueMgr = queueMgr;
this.queue = fsQueue;
this.metrics = metrics;
this.lastTimeAtMinShare = minShare;
this.lastTimeAtHalfFairShare = fairShare;
}
public FSQueueSchedulable(FairScheduler scheduler, FSQueue queue) {
this.scheduler = scheduler;
this.queue = queue;
this.queueMgr = scheduler.getQueueManager();
this.metrics = QueueMetrics.forQueue(getName(), null, true, scheduler.getConf());
this.lastTimeAtMinShare = scheduler.getClock().getTime(); this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
} }
public void addApp(AppSchedulable app) { public void addApp(FSSchedulerApp app) {
appScheds.add(app); AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
app.setAppSchedulable(appSchedulable);
appScheds.add(appSchedulable);
} }
// for testing
void addAppSchedulable(AppSchedulable appSched) {
appScheds.add(appSched);
}
public void removeApp(FSSchedulerApp app) { public void removeApp(FSSchedulerApp app) {
for (Iterator<AppSchedulable> it = appScheds.iterator(); it.hasNext();) { for (Iterator<AppSchedulable> it = appScheds.iterator(); it.hasNext();) {
AppSchedulable appSched = it.next(); AppSchedulable appSched = it.next();
@ -98,17 +82,47 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
} }
} }
} }
public Collection<AppSchedulable> getAppSchedulables() {
return appScheds;
}
public void setSchedulingMode(SchedulingMode mode) {
this.schedulingMode = mode;
}
@Override
public void recomputeFairShares() {
if (schedulingMode == SchedulingMode.FAIR) {
SchedulingAlgorithms.computeFairShares(appScheds, getFairShare());
} else {
for (AppSchedulable sched: appScheds) {
sched.setFairShare(Resources.createResource(0));
}
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
for (AppSchedulable app : appScheds) {
Resources.addTo(usage, app.getResourceUsage());
}
return usage;
}
/**
* Update demand by asking apps in the queue to update
*/
@Override @Override
public void updateDemand() { public void updateDemand() {
// Compute demand by iterating through apps in the queue // Compute demand by iterating through apps in the queue
// Limit demand to maxResources // Limit demand to maxResources
Resource maxRes = queueMgr.getMaxResources(queue.getName()); Resource maxRes = queueMgr.getMaxResources(getName());
demand = Resources.createResource(0); demand = Resources.createResource(0);
for (AppSchedulable sched: appScheds) { for (AppSchedulable sched : appScheds) {
sched.updateDemand(); sched.updateDemand();
Resource toAdd = sched.getDemand(); Resource toAdd = sched.getDemand();
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -128,46 +142,12 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
} }
} }
/**
* Distribute the queue's fair share among its jobs
*/
@Override
public void redistributeShare() {
if (queue.getSchedulingMode() == SchedulingMode.FAIR) {
SchedulingAlgorithms.computeFairShares(appScheds, getFairShare());
} else {
for (AppSchedulable sched: appScheds) {
sched.setFairShare(Resources.createResource(0));
}
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public Resource getMinShare() {
return queueMgr.getMinResources(queue.getName());
}
@Override
public double getWeight() {
return queueMgr.getQueueWeight(queue.getName());
}
@Override
public long getStartTime() {
return 0;
}
@Override @Override
public Resource assignContainer(FSSchedulerNode node, boolean reserved) { public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved); LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved);
// If this queue is over its limit, reject // If this queue is over its limit, reject
if (Resources.greaterThan(getResourceUsage(), if (Resources.greaterThan(getResourceUsage(),
queueMgr.getMaxResources(queue.getName()))) { queueMgr.getMaxResources(getName()))) {
return Resources.none(); return Resources.none();
} }
@ -185,15 +165,14 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
// Otherwise, chose app to schedule based on given policy (fair vs fifo). // Otherwise, chose app to schedule based on given policy (fair vs fifo).
else { else {
SchedulingMode mode = queue.getSchedulingMode();
Comparator<Schedulable> comparator; Comparator<Schedulable> comparator;
if (mode == SchedulingMode.FIFO) { if (schedulingMode == SchedulingMode.FIFO) {
comparator = new SchedulingAlgorithms.FifoComparator(); comparator = new SchedulingAlgorithms.FifoComparator();
} else if (mode == SchedulingMode.FAIR) { } else if (schedulingMode == SchedulingMode.FAIR) {
comparator = new SchedulingAlgorithms.FairShareComparator(); comparator = new SchedulingAlgorithms.FairShareComparator();
} else { } else {
throw new RuntimeException("Unsupported queue scheduling mode " + mode); throw new RuntimeException("Unsupported queue scheduling mode " +
schedulingMode);
} }
Collections.sort(appScheds, comparator); Collections.sort(appScheds, comparator);
@ -203,81 +182,13 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
return Resources.none(); return Resources.none();
} }
} }
@Override @Override
public String getName() { public Collection<FSQueue> getChildQueues() {
return queue.getName(); return new ArrayList<FSQueue>(1);
} }
FSQueue getQueue() {
return queue;
}
public Collection<AppSchedulable> getAppSchedulables() {
return appScheds;
}
public long getLastTimeAtMinShare() {
return lastTimeAtMinShare;
}
public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
this.lastTimeAtMinShare = lastTimeAtMinShare;
}
public long getLastTimeAtHalfFairShare() {
return lastTimeAtHalfFairShare;
}
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
}
@Override
public QueueMetrics getMetrics() {
return metrics;
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
for (AppSchedulable app : appScheds) {
Resources.addTo(usage, app.getResourceUsage());
}
return usage;
}
@Override
public Priority getPriority() {
Priority p = recordFactory.newRecordInstance(Priority.class);
p.setPriority(1);
return p;
}
@Override
public Map<QueueACL, AccessControlList> getQueueAcls() {
Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
return new HashMap<QueueACL, AccessControlList>(acls);
}
@Override
public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(getQueueName());
// TODO: we might change these queue metrics around a little bit
// to match the semantics of the fair scheduler.
queueInfo.setCapacity((float) getFairShare().getMemory() /
scheduler.getClusterCapacity().getMemory());
queueInfo.setCapacity((float) getResourceUsage().getMemory() /
scheduler.getClusterCapacity().getMemory());
queueInfo.setChildQueues(new ArrayList<QueueInfo>());
queueInfo.setQueueState(QueueState.RUNNING);
return queueInfo;
}
@Override @Override
public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) { public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
QueueUserACLInfo userAclInfo = QueueUserACLInfo userAclInfo =
@ -294,9 +205,20 @@ public class FSQueueSchedulable extends Schedulable implements Queue {
userAclInfo.setUserAcls(operations); userAclInfo.setUserAcls(operations);
return Collections.singletonList(userAclInfo); return Collections.singletonList(userAclInfo);
} }
public long getLastTimeAtMinShare() {
return lastTimeAtMinShare;
}
@Override public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
public String getQueueName() { this.lastTimeAtMinShare = lastTimeAtMinShare;
return getName(); }
public long getLastTimeAtHalfFairShare() {
return lastTimeAtHalfFairShare;
}
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
} }
} }

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
public class FSParentQueue extends FSQueue {
private static final Log LOG = LogFactory.getLog(
FSParentQueue.class.getName());
private final List<FSQueue> childQueues =
new ArrayList<FSQueue>();
private final QueueManager queueMgr;
private Resource demand = Resources.createResource(0);
public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
FSParentQueue parent) {
super(name, queueMgr, scheduler, parent);
this.queueMgr = queueMgr;
}
public void addChildQueue(FSQueue child) {
childQueues.add(child);
}
@Override
public void recomputeFairShares() {
SchedulingAlgorithms.computeFairShares(childQueues, getFairShare());
for (FSQueue childQueue : childQueues) {
childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare());
childQueue.recomputeFairShares();
}
}
@Override
public Resource getDemand() {
return demand;
}
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
for (FSQueue child : childQueues) {
Resources.addTo(usage, child.getResourceUsage());
}
return usage;
}
@Override
public void updateDemand() {
// Compute demand by iterating through apps in the queue
// Limit demand to maxResources
Resource maxRes = queueMgr.getMaxResources(getName());
demand = Resources.createResource(0);
for (FSQueue childQueue : childQueues) {
childQueue.updateDemand();
Resource toAdd = childQueue.getDemand();
if (LOG.isDebugEnabled()) {
LOG.debug("Counting resource from " + childQueue.getName() + " " +
toAdd + "; Total resource consumption for " + getName() +
" now " + demand);
}
demand = Resources.add(demand, toAdd);
if (Resources.greaterThanOrEqual(demand, maxRes)) {
demand = maxRes;
break;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand +
"; the max is " + maxRes);
}
}
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
synchronized (this) {
if (getQueueAcls().get(acl).isUserAllowed(user)) {
return true;
}
}
if (parent != null) {
return parent.hasAccess(acl, user);
}
return false;
}
private synchronized QueueUserACLInfo getUserAclInfo(
UserGroupInformation user) {
QueueUserACLInfo userAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>();
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
operations.add(operation);
}
}
userAclInfo.setQueueName(getQueueName());
userAclInfo.setUserAcls(operations);
return userAclInfo;
}
@Override
public synchronized List<QueueUserACLInfo> getQueueUserAclInfo(
UserGroupInformation user) {
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
// Add queue acls
userAcls.add(getUserAclInfo(user));
// Add children queue acls
for (FSQueue child : childQueues) {
userAcls.addAll(child.getQueueUserAclInfo(user));
}
return userAcls;
}
@Override
public Resource assignContainer(FSSchedulerNode node, boolean reserved) {
throw new IllegalStateException(
"Parent queue should not be assigned container");
}
@Override
public Collection<FSQueue> getChildQueues() {
return childQueues;
}
}

View File

@ -20,65 +20,112 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
/** public abstract class FSQueue extends Schedulable implements Queue {
* A queue containing several applications. private final String name;
*/ private final QueueManager queueMgr;
@Private private final FairScheduler scheduler;
@Unstable private final QueueMetrics metrics;
public class FSQueue {
/** Queue name. */ protected final FSParentQueue parent;
private String name; protected final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
/** Applications in this specific queue; does not include children queues' jobs. */
private Collection<FSSchedulerApp> applications = public FSQueue(String name, QueueManager queueMgr,
new ArrayList<FSSchedulerApp>(); FairScheduler scheduler, FSParentQueue parent) {
/** Scheduling mode for jobs inside the queue (fair or FIFO) */
private SchedulingMode schedulingMode;
private FairScheduler scheduler;
private FSQueueSchedulable queueSchedulable;
public FSQueue(FairScheduler scheduler, String name) {
this.name = name; this.name = name;
this.queueSchedulable = new FSQueueSchedulable(scheduler, this); this.queueMgr = queueMgr;
this.scheduler = scheduler; this.scheduler = scheduler;
this.metrics = QueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
this.parent = parent;
} }
public Collection<FSSchedulerApp> getApplications() {
return applications;
}
public void addApp(FSSchedulerApp app) {
applications.add(app);
AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
app.setAppSchedulable(appSchedulable);
queueSchedulable.addApp(appSchedulable);
}
public void removeApp(FSSchedulerApp app) {
applications.remove(app);
queueSchedulable.removeApp(app);
}
public String getName() { public String getName() {
return name; return name;
} }
public SchedulingMode getSchedulingMode() { @Override
return schedulingMode; public String getQueueName() {
return name;
}
@Override
public double getWeight() {
return queueMgr.getQueueWeight(getName());
}
@Override
public Resource getMinShare() {
return queueMgr.getMinResources(getName());
} }
public void setSchedulingMode(SchedulingMode schedulingMode) { @Override
this.schedulingMode = schedulingMode; public long getStartTime() {
return 0;
} }
public FSQueueSchedulable getQueueSchedulable() { @Override
return queueSchedulable; public Priority getPriority() {
Priority p = recordFactory.newRecordInstance(Priority.class);
p.setPriority(1);
return p;
} }
@Override
public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(getQueueName());
// TODO: we might change these queue metrics around a little bit
// to match the semantics of the fair scheduler.
queueInfo.setCapacity((float) getFairShare().getMemory() /
scheduler.getClusterCapacity().getMemory());
queueInfo.setCapacity((float) getResourceUsage().getMemory() /
scheduler.getClusterCapacity().getMemory());
ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
if (includeChildQueues) {
Collection<FSQueue> childQueues = getChildQueues();
for (FSQueue child : childQueues) {
childQueueInfos.add(child.getQueueInfo(recursive, recursive));
}
}
queueInfo.setChildQueues(childQueueInfos);
queueInfo.setQueueState(QueueState.RUNNING);
return queueInfo;
}
@Override
public Map<QueueACL, AccessControlList> getQueueAcls() {
Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
return new HashMap<QueueACL, AccessControlList>(acls);
}
@Override
public QueueMetrics getMetrics() {
return metrics;
}
/**
* Recomputes the fair shares for all queues and applications
* under this queue.
*/
public abstract void recomputeFairShares();
/**
* Gets the children of this queue, if any.
*/
public abstract Collection<FSQueue> getChildQueues();
} }

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashMap; import java.util.HashMap;
@ -75,6 +76,25 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
/**
* A scheduler that schedules resources between a set of queues. The scheduler
* keeps track of the resources used by each queue, and attempts to maintain
* fairness by scheduling tasks at queues whose allocations are farthest below
* an ideal fair distribution.
*
* The fair scheduler supports hierarchical queues. All queues descend from a
* queue named "root". Available resources are distributed among the children
* of the root queue in the typical fair scheduling fashion. Then, the children
* distribute the resources assigned to them to their children in the same
* fashion. Applications may only be scheduled on leaf queues. Queues can be
* specified as children of other queues by placing them as sub-elements of their
* parents in the fair scheduler configuration file.
*
* A queue's name starts with the names of its parents, with periods as
* separators. So a queue named "queue1" under the root named, would be
* referred to as "root.queue1", and a queue named "queue2" under a queue
* named "parent1" would be referred to as "root.parent1.queue2".
*/
@LimitedPrivate("yarn") @LimitedPrivate("yarn")
@Unstable @Unstable
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -105,23 +125,22 @@ public class FairScheduler implements ResourceScheduler {
// Aggregate metrics // Aggregate metrics
QueueMetrics rootMetrics; QueueMetrics rootMetrics;
//Time when we last updated preemption vars // Time when we last updated preemption vars
protected long lastPreemptionUpdateTime; protected long lastPreemptionUpdateTime;
//Time we last ran preemptTasksIfNecessary // Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime; private long lastPreemptCheckTime;
// This stores per-application scheduling information, indexed by // This stores per-application scheduling information, indexed by
// attempt ID's for fast lookup. // attempt ID's for fast lookup.
protected Map<ApplicationAttemptId, FSSchedulerApp> applications protected Map<ApplicationAttemptId, FSSchedulerApp> applications =
= new HashMap<ApplicationAttemptId, FSSchedulerApp>(); new HashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId // Nodes in the cluster, indexed by NodeId
private Map<NodeId, FSSchedulerNode> nodes = private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>(); new ConcurrentHashMap<NodeId, FSSchedulerNode>();
// Aggregate capacity of the cluster // Aggregate capacity of the cluster
private Resource clusterCapacity = private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
// How often tasks are preempted (must be longer than a couple // How often tasks are preempted (must be longer than a couple
@ -131,10 +150,11 @@ public class FairScheduler implements ResourceScheduler {
protected boolean preemptionEnabled; protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
protected double nodeLocalityThreshold; // Cluster threshold for node locality protected double nodeLocalityThreshold; // Cluster threshold for node locality
protected double rackLocalityThreshold; // Cluster threshold for rack locality protected double rackLocalityThreshold; // Cluster threshold for rack locality
private FairSchedulerEventLog eventLog; // Machine-readable event log private FairSchedulerEventLog eventLog; // Machine-readable event log
protected boolean assignMultiple; // Allocate multiple containers per heartbeat protected boolean assignMultiple; // Allocate multiple containers per
// heartbeat
protected int maxAssign; // Max containers to assign per heartbeat protected int maxAssign; // Max containers to assign per heartbeat
public FairScheduler() { public FairScheduler() {
@ -150,16 +170,8 @@ public class FairScheduler implements ResourceScheduler {
return queueMgr; return queueMgr;
} }
public List<FSQueueSchedulable> getQueueSchedulables() {
List<FSQueueSchedulable> scheds = new ArrayList<FSQueueSchedulable>();
for (FSQueue queue: queueMgr.getQueues()) {
scheds.add(queue.getQueueSchedulable());
}
return scheds;
}
private RMContainer getRMContainer(ContainerId containerId) { private RMContainer getRMContainer(ContainerId containerId) {
FSSchedulerApp application = FSSchedulerApp application =
applications.get(containerId.getApplicationAttemptId()); applications.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId); return (application == null) ? null : application.getRMContainer(containerId);
} }
@ -183,34 +195,24 @@ public class FairScheduler implements ResourceScheduler {
} }
/** /**
* Recompute the internal variables used by the scheduler - per-job weights, * Recompute the internal variables used by the scheduler - per-job weights,
* fair shares, deficits, minimum slot allocations, and amount of used and * fair shares, deficits, minimum slot allocations, and amount of used and
* required resources per job. * required resources per job.
*/ */
protected synchronized void update() { protected synchronized void update() {
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
updateRunnability(); // Set job runnability based on user/queue limits updateRunnability(); // Set job runnability based on user/queue limits
updatePreemptionVariables(); // Determine if any queues merit preemption updatePreemptionVariables(); // Determine if any queues merit preemption
// Update demands of apps and queues FSQueue rootQueue = queueMgr.getRootQueue();
for (FSQueue queue: queueMgr.getQueues()) {
queue.getQueueSchedulable().updateDemand();
}
// Compute fair shares based on updated demands // Recursively update demands for all queues
List<FSQueueSchedulable> queueScheds = getQueueSchedulables(); rootQueue.updateDemand();
SchedulingAlgorithms.computeFairShares(
queueScheds, clusterCapacity);
// Update queue metrics for this queue rootQueue.setFairShare(clusterCapacity);
for (FSQueueSchedulable sched : queueScheds) { // Recursively compute fair shares for all queues
sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare()); // and update metrics
} rootQueue.recomputeFairShares();
// Use the computed shares to assign shares within each queue
for (FSQueue queue: queueMgr.getQueues()) {
queue.getQueueSchedulable().redistributeShare();
}
// Update recorded capacity of root queue (child queues are updated // Update recorded capacity of root queue (child queues are updated
// when fair share is calculated). // when fair share is calculated).
@ -225,7 +227,7 @@ public class FairScheduler implements ResourceScheduler {
private void updatePreemptionVariables() { private void updatePreemptionVariables() {
long now = clock.getTime(); long now = clock.getTime();
lastPreemptionUpdateTime = now; lastPreemptionUpdateTime = now;
for (FSQueueSchedulable sched: getQueueSchedulables()) { for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
if (!isStarvedForMinShare(sched)) { if (!isStarvedForMinShare(sched)) {
sched.setLastTimeAtMinShare(now); sched.setLastTimeAtMinShare(now);
} }
@ -238,16 +240,16 @@ public class FairScheduler implements ResourceScheduler {
/** /**
* Is a queue below its min share for the given task type? * Is a queue below its min share for the given task type?
*/ */
boolean isStarvedForMinShare(FSQueueSchedulable sched) { boolean isStarvedForMinShare(FSLeafQueue sched) {
Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand()); Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredShare); return Resources.lessThan(sched.getResourceUsage(), desiredShare);
} }
/** /**
* Is a queue being starved for fair share for the given task type? * Is a queue being starved for fair share for the given task type? This is
* This is defined as being below half its fair share. * defined as being below half its fair share.
*/ */
boolean isStarvedForFairShare(FSQueueSchedulable sched) { boolean isStarvedForFairShare(FSLeafQueue sched) {
Resource desiredFairShare = Resources.max( Resource desiredFairShare = Resources.max(
Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredFairShare); return Resources.lessThan(sched.getResourceUsage(), desiredFairShare);
@ -255,10 +257,10 @@ public class FairScheduler implements ResourceScheduler {
/** /**
* Check for queues that need tasks preempted, either because they have been * Check for queues that need tasks preempted, either because they have been
* below their guaranteed share for minSharePreemptionTimeout or they * below their guaranteed share for minSharePreemptionTimeout or they have
* have been below half their fair share for the fairSharePreemptionTimeout. * been below half their fair share for the fairSharePreemptionTimeout. If
* If such queues exist, compute how many tasks of each type need to be * such queues exist, compute how many tasks of each type need to be preempted
* preempted and then select the right ones using preemptTasks. * and then select the right ones using preemptTasks.
*/ */
protected synchronized void preemptTasksIfNecessary() { protected synchronized void preemptTasksIfNecessary() {
if (!preemptionEnabled) { if (!preemptionEnabled) {
@ -273,35 +275,37 @@ public class FairScheduler implements ResourceScheduler {
Resource resToPreempt = Resources.none(); Resource resToPreempt = Resources.none();
for (FSQueueSchedulable sched: getQueueSchedulables()) { for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
} }
if (Resources.greaterThan(resToPreempt, Resources.none())) { if (Resources.greaterThan(resToPreempt, Resources.none())) {
preemptResources(getQueueSchedulables(), resToPreempt); preemptResources(queueMgr.getLeafQueues(), resToPreempt);
} }
} }
/** /**
* Preempt a quantity of resources from a list of QueueSchedulables. * Preempt a quantity of resources from a list of QueueSchedulables. The
* The policy for this is to pick apps from queues that are over their fair * policy for this is to pick apps from queues that are over their fair share,
* share, but make sure that no queue is placed below its fair share in the * but make sure that no queue is placed below its fair share in the process.
* process. We further prioritize preemption by choosing containers with * We further prioritize preemption by choosing containers with lowest
* lowest priority to preempt. * priority to preempt.
*/ */
protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) { protected void preemptResources(Collection<FSLeafQueue> scheds,
Resource toPreempt) {
if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) { if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
return; return;
} }
Map<RMContainer, FSSchedulerApp> apps = Map<RMContainer, FSSchedulerApp> apps =
new HashMap<RMContainer, FSSchedulerApp>(); new HashMap<RMContainer, FSSchedulerApp>();
Map<RMContainer, FSQueueSchedulable> queues = new HashMap<RMContainer, FSQueueSchedulable>(); Map<RMContainer, FSLeafQueue> queues =
new HashMap<RMContainer, FSLeafQueue>();
// Collect running containers from over-scheduled queues // Collect running containers from over-scheduled queues
List<RMContainer> runningContainers = new ArrayList<RMContainer>(); List<RMContainer> runningContainers = new ArrayList<RMContainer>();
for (FSQueueSchedulable sched: scheds) { for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) { if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
for (AppSchedulable as: sched.getAppSchedulables()) { for (AppSchedulable as : sched.getAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) { for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c); runningContainers.add(c);
apps.put(c, as.getApp()); apps.put(c, as.getApp());
@ -321,12 +325,12 @@ public class FairScheduler implements ResourceScheduler {
// Scan down the sorted list of task statuses until we've killed enough // Scan down the sorted list of task statuses until we've killed enough
// tasks, making sure we don't kill too many from any queue // tasks, making sure we don't kill too many from any queue
for (RMContainer container: runningContainers) { for (RMContainer container : runningContainers) {
FSQueueSchedulable sched = queues.get(container); FSLeafQueue sched = queues.get(container);
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) { if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() + "res=" + container.getContainer().getResource() +
") from queue " + sched.getQueue().getName()); ") from queue " + sched.getName());
ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus( ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
@ -348,12 +352,12 @@ public class FairScheduler implements ResourceScheduler {
* If the queue has been below its min share for at least its preemption * If the queue has been below its min share for at least its preemption
* timeout, it should preempt the difference between its current share and * timeout, it should preempt the difference between its current share and
* this min share. If it has been below half its fair share for at least the * this min share. If it has been below half its fair share for at least the
* fairSharePreemptionTimeout, it should preempt enough tasks to get up to * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
* its full fair share. If both conditions hold, we preempt the max of the * full fair share. If both conditions hold, we preempt the max of the two
* two amounts (this shouldn't happen unless someone sets the timeouts to * amounts (this shouldn't happen unless someone sets the timeouts to be
* be identical for some reason). * identical for some reason).
*/ */
protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) { protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
String queue = sched.getName(); String queue = sched.getName();
long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue); long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout(); long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
@ -362,7 +366,7 @@ public class FairScheduler implements ResourceScheduler {
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.min(sched.getMinShare(), sched.getDemand()); Resource target = Resources.min(sched.getMinShare(), sched.getDemand());
resDueToMinShare = Resources.max(Resources.none(), resDueToMinShare = Resources.max(Resources.none(),
Resources.subtract(target, sched.getResourceUsage())); Resources.subtract(target, sched.getResourceUsage()));
} }
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
Resource target = Resources.min(sched.getFairShare(), sched.getDemand()); Resource target = Resources.min(sched.getFairShare(), sched.getDemand());
@ -380,15 +384,15 @@ public class FairScheduler implements ResourceScheduler {
} }
/** /**
* This updates the runnability of all apps based on whether or not * This updates the runnability of all apps based on whether or not any
* any users/queues have exceeded their capacity. * users/queues have exceeded their capacity.
*/ */
private void updateRunnability() { private void updateRunnability() {
List<AppSchedulable> apps = new ArrayList<AppSchedulable>(); List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
// Start by marking everything as not runnable // Start by marking everything as not runnable
for (FSQueue p: queueMgr.getQueues()) { for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
for (AppSchedulable a: p.getQueueSchedulable().getAppSchedulables()) { for (AppSchedulable a : leafQueue.getAppSchedulables()) {
a.setRunnable(false); a.setRunnable(false);
apps.add(a); apps.add(a);
} }
@ -400,7 +404,7 @@ public class FairScheduler implements ResourceScheduler {
Map<String, Integer> userApps = new HashMap<String, Integer>(); Map<String, Integer> userApps = new HashMap<String, Integer>();
Map<String, Integer> queueApps = new HashMap<String, Integer>(); Map<String, Integer> queueApps = new HashMap<String, Integer>();
for (AppSchedulable app: apps) { for (AppSchedulable app : apps) {
String user = app.getApp().getUser(); String user = app.getApp().getUser();
String queue = app.getApp().getQueueName(); String queue = app.getApp().getQueueName();
int userCount = userApps.containsKey(user) ? userApps.get(user) : 0; int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
@ -473,22 +477,25 @@ public class FairScheduler implements ResourceScheduler {
} }
/** /**
* Add a new application to the scheduler, with a given id, queue name, * Add a new application to the scheduler, with a given id, queue name, and
* and user. This will accept a new app even if the user or queue is above * user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable. * configured limits, but the app will not be marked as runnable.
*/ */
protected synchronized void protected synchronized void addApplication(
addApplication(ApplicationAttemptId applicationAttemptId, ApplicationAttemptId applicationAttemptId, String queueName, String user) {
String queueName, String user) {
FSQueue queue = queueMgr.getQueue(queueName); FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
if (queue == null) {
// queue is not an existing or createable leaf queue
queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
FSSchedulerApp schedulerApp = FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user, new FSSchedulerApp(applicationAttemptId, user,
queue.getQueueSchedulable(), new ActiveUsersManager(getRootQueueMetrics()), queue, new ActiveUsersManager(getRootQueueMetrics()),
rmContext); rmContext);
// Inforce ACLs // Enforce ACLs
UserGroupInformation userUgi; UserGroupInformation userUgi;
try { try {
userUgi = UserGroupInformation.getCurrentUser(); userUgi = UserGroupInformation.getCurrentUser();
@ -497,8 +504,8 @@ public class FairScheduler implements ResourceScheduler {
return; return;
} }
List<QueueUserACLInfo> info = queue.getQueueSchedulable().getQueueUserAclInfo( // Always a singleton list
userUgi); // Always a signleton list List<QueueUserACLInfo> info = queue.getQueueUserAclInfo(userUgi);
if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) { if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) {
LOG.info("User " + userUgi.getUserName() + LOG.info("User " + userUgi.getUserName() +
" cannot submit" + " applications to queue " + queue.getName()); " cannot submit" + " applications to queue " + queue.getName());
@ -506,14 +513,13 @@ public class FairScheduler implements ResourceScheduler {
} }
queue.addApp(schedulerApp); queue.addApp(schedulerApp);
queue.getQueueSchedulable().getMetrics().submitApp(user, queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
applicationAttemptId.getAttemptId());
rootMetrics.submitApp(user, applicationAttemptId.getAttemptId()); rootMetrics.submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp); applications.put(applicationAttemptId, schedulerApp);
LOG.info("Application Submission: " + applicationAttemptId + LOG.info("Application Submission: " + applicationAttemptId +
", user: " + user + ", user: "+ user +
", currently active: " + applications.size()); ", currently active: " + applications.size());
rmContext.getDispatcher().getEventHandler().handle( rmContext.getDispatcher().getEventHandler().handle(
@ -540,10 +546,10 @@ public class FairScheduler implements ResourceScheduler {
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(), rmContainer.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION), SchedulerUtils.COMPLETED_APPLICATION),
RMContainerEventType.KILL); RMContainerEventType.KILL);
} }
// Release all reserved containers // Release all reserved containers
for (RMContainer rmContainer : application.getReservedContainers()) { for (RMContainer rmContainer : application.getReservedContainers()) {
completedContainer(rmContainer, completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
@ -556,7 +562,8 @@ public class FairScheduler implements ResourceScheduler {
application.stop(rmAppAttemptFinalState); application.stop(rmAppAttemptFinalState);
// Inform the queue // Inform the queue
FSQueue queue = queueMgr.getQueue(application.getQueue().getQueueName()); FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
.getQueueName());
queue.removeApp(application); queue.removeApp(application);
// Remove from our data-structure // Remove from our data-structure
@ -658,11 +665,11 @@ public class FairScheduler implements ResourceScheduler {
for (ContainerId releasedContainerId : release) { for (ContainerId releasedContainerId : release) {
RMContainer rmContainer = getRMContainer(releasedContainerId); RMContainer rmContainer = getRMContainer(releasedContainerId);
if (rmContainer == null) { if (rmContainer == null) {
RMAuditLogger.logFailure(application.getUser(), RMAuditLogger.logFailure(application.getUser(),
AuditConstants.RELEASE_CONTAINER, AuditConstants.RELEASE_CONTAINER,
"Unauthorized access or invalid container", "FairScheduler", "Unauthorized access or invalid container", "FairScheduler",
"Trying to release container not owned by app or with invalid id", "Trying to release container not owned by app or with invalid id",
application.getApplicationId(), releasedContainerId); application.getApplicationId(), releasedContainerId);
} }
completedContainer(rmContainer, completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
@ -675,8 +682,8 @@ public class FairScheduler implements ResourceScheduler {
if (!ask.isEmpty()) { if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("allocate: pre-update" + LOG.debug("allocate: pre-update" +
" applicationAttemptId=" + appAttemptId + " applicationAttemptId=" + appAttemptId +
" application=" + application.getApplicationId()); " application=" + application.getApplicationId());
} }
application.showRequests(); application.showRequests();
@ -689,19 +696,17 @@ public class FairScheduler implements ResourceScheduler {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("allocate:" + LOG.debug("allocate:" +
" applicationAttemptId=" + appAttemptId + " applicationAttemptId=" + appAttemptId +
" #ask=" + ask.size()); " #ask=" + ask.size());
} }
return new Allocation( return new Allocation(application.pullNewlyAllocatedContainers(),
application.pullNewlyAllocatedContainers(),
application.getHeadroom()); application.getHeadroom());
} }
} }
/** /**
* Process a container which has launched on a node, as reported by the * Process a container which has launched on a node, as reported by the node.
* node.
*/ */
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container // Get the application for the finished container
@ -757,20 +762,20 @@ public class FairScheduler implements ResourceScheduler {
LOG.info("Trying to fulfill reservation for application " + LOG.info("Trying to fulfill reservation for application " +
reservedApplication.getApplicationId() + " on node: " + nm); reservedApplication.getApplicationId() + " on node: " + nm);
FSQueue queue = queueMgr.getQueue(reservedApplication.getQueueName()); FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName());
queue.getQueueSchedulable().assignContainer(node, true); queue.assignContainer(node, true);
} }
// Otherwise, schedule at queue which is furthest below fair share // Otherwise, schedule at queue which is furthest below fair share
else { else {
int assignedContainers = 0; int assignedContainers = 0;
while (true) { while (true) {
// At most one task is scheduled each iteration of this loop // At most one task is scheduled each iteration of this loop
List<FSQueueSchedulable> scheds = getQueueSchedulables(); List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
queueMgr.getLeafQueues());
Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator()); Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
boolean assignedContainer = false; boolean assignedContainer = false;
for (FSQueueSchedulable sched : scheds) { for (FSLeafQueue sched : scheds) {
Resource assigned = sched.assignContainer(node, false); Resource assigned = sched.assignContainer(node, false);
if (Resources.greaterThan(assigned, Resources.none())) { if (Resources.greaterThan(assigned, Resources.none())) {
eventLog.log("ASSIGN", nm.getHostName(), assigned); eventLog.log("ASSIGN", nm.getHostName(), assigned);
@ -813,7 +818,7 @@ public class FairScheduler implements ResourceScheduler {
@Override @Override
public void handle(SchedulerEvent event) { public void handle(SchedulerEvent event) {
switch(event.getType()) { switch (event.getType()) {
case NODE_ADDED: case NODE_ADDED:
if (!(event instanceof NodeAddedSchedulerEvent)) { if (!(event instanceof NodeAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
@ -832,8 +837,7 @@ public class FairScheduler implements ResourceScheduler {
if (!(event instanceof NodeUpdateSchedulerEvent)) { if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
} }
NodeUpdateSchedulerEvent nodeUpdatedEvent = NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
(NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode(), nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(), nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers()); nodeUpdatedEvent.getCompletedContainers());
@ -842,7 +846,7 @@ public class FairScheduler implements ResourceScheduler {
if (!(event instanceof AppAddedSchedulerEvent)) { if (!(event instanceof AppAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
} }
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
String queue = appAddedEvent.getQueue(); String queue = appAddedEvent.getQueue();
// Potentially set queue to username if configured to do so // Potentially set queue to username if configured to do so
@ -867,7 +871,7 @@ public class FairScheduler implements ResourceScheduler {
throw new RuntimeException("Unexpected event type: " + event); throw new RuntimeException("Unexpected event type: " + event);
} }
ContainerExpiredSchedulerEvent containerExpiredEvent = ContainerExpiredSchedulerEvent containerExpiredEvent =
(ContainerExpiredSchedulerEvent) event; (ContainerExpiredSchedulerEvent)event;
ContainerId containerId = containerExpiredEvent.getContainerId(); ContainerId containerId = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerId), completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus( SchedulerUtils.createAbnormalContainerStatus(
@ -886,8 +890,8 @@ public class FairScheduler implements ResourceScheduler {
} }
@Override @Override
public synchronized void public synchronized void reinitialize(Configuration conf, RMContext rmContext)
reinitialize(Configuration conf, RMContext rmContext) throws IOException { throws IOException {
if (!initialized) { if (!initialized) {
this.conf = new FairSchedulerConfiguration(conf); this.conf = new FairSchedulerConfiguration(conf);
rootMetrics = QueueMetrics.forQueue("root", null, true, conf); rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
@ -909,11 +913,10 @@ public class FairScheduler implements ResourceScheduler {
try { try {
queueMgr.initialize(); queueMgr.initialize();
} } catch (Exception e) {
catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e); throw new IOException("Failed to start FairScheduler", e);
} }
Thread updateThread = new Thread(new UpdateThread()); Thread updateThread = new Thread(new UpdateThread());
updateThread.setName("FairSchedulerUpdateThread"); updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true); updateThread.setDaemon(true);
@ -925,10 +928,9 @@ public class FairScheduler implements ResourceScheduler {
rackLocalityThreshold = this.conf.getLocalityThresholdRack(); rackLocalityThreshold = this.conf.getLocalityThresholdRack();
preemptionEnabled = this.conf.getPreemptionEnabled(); preemptionEnabled = this.conf.getPreemptionEnabled();
try { try {
queueMgr.reloadAllocs(); queueMgr.reloadAllocs();
} } catch (Exception e) {
catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e); throw new IOException("Failed to initialize FairScheduler", e);
} }
} }
@ -940,8 +942,8 @@ public class FairScheduler implements ResourceScheduler {
if (!queueMgr.exists(queueName)) { if (!queueMgr.exists(queueName)) {
return null; return null;
} }
return queueMgr.getQueue(queueName).getQueueSchedulable().getQueueInfo( return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
includeChildQueues, recursive); recursive);
} }
@Override @Override
@ -953,12 +955,7 @@ public class FairScheduler implements ResourceScheduler {
return new ArrayList<QueueUserACLInfo>(); return new ArrayList<QueueUserACLInfo>();
} }
List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>(); return queueMgr.getRootQueue().getQueueUserAclInfo(user);
for (FSQueue queue : queueMgr.getQueues()) {
userAcls.addAll(queue.getQueueSchedulable().getQueueUserAclInfo(user));
}
return userAcls;
} }
@Override @Override

View File

@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.DocumentBuilderFactory;
@ -52,6 +53,7 @@ import org.xml.sax.SAXException;
/** /**
* Maintains a list of queues as well as scheduling parameters for each queue, * Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file. * such as guaranteed share allocations, from the fair scheduler config file.
*
*/ */
@Private @Private
@Unstable @Unstable
@ -59,6 +61,8 @@ public class QueueManager {
public static final Log LOG = LogFactory.getLog( public static final Log LOG = LogFactory.getLog(
QueueManager.class.getName()); QueueManager.class.getName());
public static final String ROOT_QUEUE = "root";
/** Time to wait between checks of the allocation file */ /** Time to wait between checks of the allocation file */
public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000; public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
@ -76,7 +80,10 @@ public class QueueManager {
// used) or a String to specify an absolute path (if // used) or a String to specify an absolute path (if
// mapred.fairscheduler.allocation.file is used). // mapred.fairscheduler.allocation.file is used).
private Map<String, FSQueue> queues = new HashMap<String, FSQueue>(); private final Collection<FSLeafQueue> leafQueues =
new CopyOnWriteArrayList<FSLeafQueue>();
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private FSParentQueue rootQueue;
private volatile QueueManagerInfo info = new QueueManagerInfo(); private volatile QueueManagerInfo info = new QueueManagerInfo();
@ -87,10 +94,17 @@ public class QueueManager {
public QueueManager(FairScheduler scheduler) { public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler; this.scheduler = scheduler;
} }
public FSParentQueue getRootQueue() {
return rootQueue;
}
public void initialize() throws IOException, SAXException, public void initialize() throws IOException, SAXException,
AllocationConfigurationException, ParserConfigurationException { AllocationConfigurationException, ParserConfigurationException {
FairSchedulerConfiguration conf = scheduler.getConf(); FairSchedulerConfiguration conf = scheduler.getConf();
rootQueue = new FSParentQueue("root", this, scheduler, null);
queues.put(rootQueue.getName(), rootQueue);
this.allocFile = conf.getAllocationFile(); this.allocFile = conf.getAllocationFile();
if (allocFile == null) { if (allocFile == null) {
// No allocation file specified in jobconf. Use the default allocation // No allocation file specified in jobconf. Use the default allocation
@ -106,21 +120,106 @@ public class QueueManager {
lastSuccessfulReload = scheduler.getClock().getTime(); lastSuccessfulReload = scheduler.getClock().getTime();
lastReloadAttempt = scheduler.getClock().getTime(); lastReloadAttempt = scheduler.getClock().getTime();
// Create the default queue // Create the default queue
getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
} }
/** /**
* Get a queue by name, creating it if necessary * Get a queue by name, creating it if necessary. If the queue
* is not or can not be a leaf queue, i.e. it already exists as a parent queue,
* or one of the parents in its name is already a leaf queue, null is returned.
*
* The root part of the name is optional, so a queue underneath the root
* named "queue1" could be referred to as just "queue1", and a queue named
* "queue2" underneath a parent named "parent1" that is underneath the root
* could be referred to as just "parent1.queue2".
*/ */
public FSQueue getQueue(String name) { public FSLeafQueue getLeafQueue(String name) {
if (!name.startsWith(ROOT_QUEUE + ".")) {
name = ROOT_QUEUE + "." + name;
}
synchronized (queues) { synchronized (queues) {
FSQueue queue = queues.get(name); FSQueue queue = queues.get(name);
if (queue == null) { if (queue == null) {
queue = new FSQueue(scheduler, name); FSLeafQueue leafQueue = createLeafQueue(name);
queue.setSchedulingMode(info.defaultSchedulingMode); if (leafQueue == null) {
queues.put(name, queue); return null;
}
leafQueue.setSchedulingMode(info.defaultSchedulingMode);
queue = leafQueue;
} else if (queue instanceof FSParentQueue) {
return null;
} }
return queue; return (FSLeafQueue)queue;
}
}
/**
* Creates a leaf queue and places it in the tree. Creates any
* parents that don't already exist.
*
* @return
* the created queue, if successful. null if not allowed (one of the parent
* queues in the queue name is already a leaf queue)
*/
private FSLeafQueue createLeafQueue(String name) {
List<String> newQueueNames = new ArrayList<String>();
newQueueNames.add(name);
int sepIndex = name.length();
FSParentQueue parent = null;
// Move up the queue tree until we reach one that exists.
while (sepIndex != -1) {
sepIndex = name.lastIndexOf('.', sepIndex-1);
FSQueue queue;
String curName = null;
curName = name.substring(0, sepIndex);
queue = queues.get(curName);
if (queue == null) {
newQueueNames.add(curName);
} else {
if (queue instanceof FSParentQueue) {
parent = (FSParentQueue)queue;
break;
} else {
return null;
}
}
}
// At this point, parent refers to the deepest existing parent of the
// queue to create.
// Now that we know everything worked out, make all the queues
// and add them to the map.
FSLeafQueue leafQueue = null;
for (int i = newQueueNames.size()-1; i >= 0; i--) {
String queueName = newQueueNames.get(i);
if (i == 0) {
// First name added was the leaf queue
leafQueue = new FSLeafQueue(name, this, scheduler, parent);
parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue);
} else {
FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent);
parent.addChildQueue(newParent);
queues.put(newParent.getName(), newParent);
parent = newParent;
}
}
return leafQueue;
}
/**
* Gets a queue by name.
*/
public FSQueue getQueue(String name) {
if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
name = ROOT_QUEUE + "." + name;
}
synchronized (queues) {
return queues.get(name);
} }
} }
@ -136,8 +235,8 @@ public class QueueManager {
/** /**
* Get the queue for a given AppSchedulable. * Get the queue for a given AppSchedulable.
*/ */
public FSQueue getQueueForApp(AppSchedulable app) { public FSLeafQueue getQueueForApp(AppSchedulable app) {
return getQueue(app.getApp().getQueueName()); return getLeafQueue(app.getApp().getQueueName());
} }
/** /**
@ -237,54 +336,9 @@ public class QueueManager {
Element element = (Element)node; Element element = (Element)node;
if ("queue".equals(element.getTagName()) || if ("queue".equals(element.getTagName()) ||
"pool".equals(element.getTagName())) { "pool".equals(element.getTagName())) {
String queueName = element.getAttribute("name"); loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
Map<QueueACL, AccessControlList> acls = userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
new HashMap<QueueACL, AccessControlList>(); queueAcls, queueNamesInAllocFile);
queueNamesInAllocFile.add(queueName);
NodeList fields = element.getChildNodes();
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
minQueueResources.put(queueName, Resources.createResource(val));
} else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
maxQueueResources.put(queueName, Resources.createResource(val));
} else if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
queueWeights.put(queueName, val);
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(queueName, val);
} else if ("schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
queueModes.put(queueName, parseSchedulingMode(text));
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
} else if ("aclAdministerApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
}
}
queueAcls.put(queueName, acls);
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
&& Resources.lessThan(maxQueueResources.get(queueName),
minQueueResources.get(queueName))) {
LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
}
} else if ("user".equals(element.getTagName())) { } else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name"); String userName = element.getAttribute("name");
NodeList fields = element.getChildNodes(); NodeList fields = element.getChildNodes();
@ -331,7 +385,7 @@ public class QueueManager {
queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts, queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout); queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
for (String name: queueNamesInAllocFile) { for (String name: queueNamesInAllocFile) {
FSQueue queue = getQueue(name); FSLeafQueue queue = getLeafQueue(name);
if (queueModes.containsKey(name)) { if (queueModes.containsKey(name)) {
queue.setSchedulingMode(queueModes.get(name)); queue.setSchedulingMode(queueModes.get(name));
} else { } else {
@ -340,6 +394,75 @@ public class QueueManager {
} }
} }
} }
/**
* Loads a queue from a queue element in the configuration file
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
throws AllocationConfigurationException {
String queueName = parentName + "." + element.getAttribute("name");
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
NodeList fields = element.getChildNodes();
boolean isLeaf = true;
for (int j = 0; j < fields.getLength(); j++) {
Node fieldNode = fields.item(j);
if (!(fieldNode instanceof Element))
continue;
Element field = (Element) fieldNode;
if ("minResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
minQueueResources.put(queueName, Resources.createResource(val));
} else if ("maxResources".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
maxQueueResources.put(queueName, Resources.createResource(val));
} else if ("maxRunningApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
queueMaxApps.put(queueName, val);
} else if ("weight".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
queueWeights.put(queueName, val);
} else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(queueName, val);
} else if ("schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
queueModes.put(queueName, parseSchedulingMode(text));
} else if ("aclSubmitApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
} else if ("aclAdministerApps".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
} else if ("queue".endsWith(field.getTagName()) ||
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps,
userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
queueAcls, queueNamesInAllocFile);
isLeaf = false;
}
}
if (isLeaf) {
queueNamesInAllocFile.add(queueName);
}
queueAcls.put(queueName, acls);
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
&& Resources.lessThan(maxQueueResources.get(queueName),
minQueueResources.get(queueName))) {
LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
}
}
private SchedulingMode parseSchedulingMode(String text) private SchedulingMode parseSchedulingMode(String text)
throws AllocationConfigurationException { throws AllocationConfigurationException {
@ -384,9 +507,9 @@ public class QueueManager {
/** /**
* Get a collection of all queues * Get a collection of all queues
*/ */
public Collection<FSQueue> getQueues() { public Collection<FSLeafQueue> getLeafQueues() {
synchronized (queues) { synchronized (queues) {
return new ArrayList<FSQueue>(queues.values()); return leafQueues;
} }
} }

View File

@ -91,12 +91,6 @@ abstract class Schedulable {
/** Refresh the Schedulable's demand and those of its children if any. */ /** Refresh the Schedulable's demand and those of its children if any. */
public abstract void updateDemand(); public abstract void updateDemand();
/**
* Distribute the fair share assigned to this Schedulable among its
* children (used in queues where the internal scheduler is fair sharing).
*/
public abstract void redistributeShare();
/** /**
* Assign a container on this node if possible, and return the amount of * Assign a container on this node if possible, and return the amount of
* resources assigned. If {@code reserved} is true, it means a reservation * resources assigned. If {@code reserved} is true, it means a reservation

View File

@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.List; import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
public class FairSchedulerInfo { public class FairSchedulerInfo {
@ -32,9 +32,9 @@ public class FairSchedulerInfo {
public FairSchedulerInfo(FairScheduler fs) { public FairSchedulerInfo(FairScheduler fs) {
scheduler = fs; scheduler = fs;
Collection<FSQueue> queues = fs.getQueueManager().getQueues(); Collection<FSLeafQueue> queues = fs.getQueueManager().getLeafQueues();
queueInfos = new ArrayList<FairSchedulerQueueInfo>(); queueInfos = new ArrayList<FairSchedulerQueueInfo>();
for (FSQueue queue : queues) { for (FSLeafQueue queue : queues) {
queueInfos.add(new FairSchedulerQueueInfo(queue, fs)); queueInfos.add(new FairSchedulerQueueInfo(queue, fs));
} }
} }

View File

@ -22,9 +22,8 @@ import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueSchedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
@ -49,17 +48,16 @@ public class FairSchedulerQueueInfo {
private String queueName; private String queueName;
public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { public FairSchedulerQueueInfo(FSLeafQueue queue, FairScheduler scheduler) {
Collection<FSSchedulerApp> apps = queue.getApplications(); Collection<AppSchedulable> apps = queue.getAppSchedulables();
for (FSSchedulerApp app : apps) { for (AppSchedulable app : apps) {
if (app.isPending()) { if (app.getApp().isPending()) {
numPendingApps++; numPendingApps++;
} else { } else {
numActiveApps++; numActiveApps++;
} }
} }
FSQueueSchedulable schedulable = queue.getQueueSchedulable();
QueueManager manager = scheduler.getQueueManager(); QueueManager manager = scheduler.getQueueManager();
queueName = queue.getName(); queueName = queue.getName();
@ -67,11 +65,11 @@ public class FairSchedulerQueueInfo {
Resource clusterMax = scheduler.getClusterCapacity(); Resource clusterMax = scheduler.getClusterCapacity();
clusterMaxMem = clusterMax.getMemory(); clusterMaxMem = clusterMax.getMemory();
usedResources = schedulable.getResourceUsage(); usedResources = queue.getResourceUsage();
fractionUsed = (float)usedResources.getMemory() / clusterMaxMem; fractionUsed = (float)usedResources.getMemory() / clusterMaxMem;
fairShare = schedulable.getFairShare().getMemory(); fairShare = queue.getFairShare().getMemory();
minResources = schedulable.getMinShare(); minResources = queue.getMinShare();
minShare = minResources.getMemory(); minShare = minResources.getMemory();
maxResources = scheduler.getQueueManager().getMaxResources(queueName); maxResources = scheduler.getQueueManager().getMaxResources(queueName);
if (maxResources.getMemory() > clusterMaxMem) { if (maxResources.getMemory() > clusterMaxMem) {

View File

@ -107,9 +107,6 @@ public class FakeSchedulable extends Schedulable {
return minShare; return minShare;
} }
@Override
public void redistributeShare() {}
@Override @Override
public void updateDemand() {} public void updateDemand() {}
} }

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSLeafQueue {
private FSLeafQueue schedulable = null;
private Resource maxResource = Resources.createResource(10);
@Before
public void setup() throws IOException {
FairScheduler scheduler = new FairScheduler();
Configuration conf = createConfiguration();
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
RMStateStore store = StoreFactory.getStore(conf);
ResourceManager resourceManager = new ResourceManager(store);
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
String queueName = "root.queue1";
QueueManager mockMgr = mock(QueueManager.class);
when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource);
schedulable = new FSLeafQueue(queueName, mockMgr, scheduler, null);
}
@Test
public void testUpdateDemand() {
AppSchedulable app = mock(AppSchedulable.class);
Mockito.when(app.getDemand()).thenReturn(maxResource);
schedulable.addAppSchedulable(app);
schedulable.addAppSchedulable(app);
schedulable.updateDemand();
assertTrue("Demand is greater than max allowed ",
Resources.equals(schedulable.getDemand(), maxResource));
}
private Configuration createConfiguration() {
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
return conf;
}
}

View File

@ -1,42 +0,0 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSQueueSchedulable {
private FSQueueSchedulable schedulable = null;
private Resource maxResource = Resources.createResource(10);
@Before
public void setup() {
String queueName = "testFSQueue";
FSQueue mockQueue = mock(FSQueue.class);
when(mockQueue.getName()).thenReturn(queueName);
QueueManager mockMgr = mock(QueueManager.class);
when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource);
schedulable = new FSQueueSchedulable(null, mockQueue, mockMgr, null, 0, 0);
}
@Test
public void testUpdateDemand() {
AppSchedulable app = mock(AppSchedulable.class);
Mockito.when(app.getDemand()).thenReturn(maxResource);
schedulable.addApp(app);
schedulable.addApp(app);
schedulable.updateDemand();
assertTrue("Demand is greater than max allowed ",
Resources.equals(schedulable.getDemand(), maxResource));
}
}

View File

@ -31,6 +31,10 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.xml.parsers.ParserConfigurationException;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
@ -61,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.xml.sax.SAXException;
public class TestFairScheduler { public class TestFairScheduler {
@ -195,15 +200,64 @@ public class TestFairScheduler {
scheduler.update(); scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size()); assertEquals(3, queues.size());
for (FSQueue p : queues) { for (FSLeafQueue p : queues) {
if (p.getName() != "default") { if (!p.getName().equals("root.default")) {
assertEquals(5120, p.getQueueSchedulable().getFairShare().getMemory()); assertEquals(5120, p.getFairShare().getMemory());
} }
} }
} }
@Test
public void testSimpleHierarchicalFairShareCalculation() {
// Add one big node (only care about aggregate capacity)
int capacity = 10 * 24;
RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity));
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Have two queues which want entire cluster capacity
createSchedulingRequest(10 * 1024, "queue1", "user1");
createSchedulingRequest(10 * 1024, "parent.queue2", "user1");
createSchedulingRequest(10 * 1024, "parent.queue3", "user1");
scheduler.update();
QueueManager queueManager = scheduler.getQueueManager();
Collection<FSLeafQueue> queues = queueManager.getLeafQueues();
assertEquals(4, queues.size());
FSLeafQueue queue1 = queueManager.getLeafQueue("queue1");
FSLeafQueue queue2 = queueManager.getLeafQueue("parent.queue2");
FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3");
assertEquals(capacity / 2, queue1.getFairShare().getMemory());
assertEquals(capacity / 4, queue2.getFairShare().getMemory());
assertEquals(capacity / 4, queue3.getFairShare().getMemory());
}
@Test
public void testHierarchicalQueuesSimilarParents() {
QueueManager queueManager = scheduler.getQueueManager();
FSLeafQueue leafQueue = queueManager.getLeafQueue("parent.child");
Assert.assertEquals(2, queueManager.getLeafQueues().size());
Assert.assertNotNull(leafQueue);
Assert.assertEquals("root.parent.child", leafQueue.getName());
FSLeafQueue leafQueue2 = queueManager.getLeafQueue("parent");
Assert.assertNull(leafQueue2);
Assert.assertEquals(2, queueManager.getLeafQueues().size());
FSLeafQueue leafQueue3 = queueManager.getLeafQueue("parent.child.grandchild");
Assert.assertNull(leafQueue3);
Assert.assertEquals(2, queueManager.getLeafQueues().size());
FSLeafQueue leafQueue4 = queueManager.getLeafQueue("parent.sister");
Assert.assertNotNull(leafQueue4);
Assert.assertEquals("root.parent.sister", leafQueue4.getName());
Assert.assertEquals(3, queueManager.getLeafQueues().size());
}
@Test @Test
public void testSimpleContainerAllocation() { public void testSimpleContainerAllocation() {
@ -228,14 +282,14 @@ public class TestFairScheduler {
// Asked for less than min_allocation. // Asked for less than min_allocation.
assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
scheduler.getQueueManager().getQueue("queue1"). scheduler.getQueueManager().getQueue("queue1").
getQueueSchedulable().getResourceUsage().getMemory()); getResourceUsage().getMemory());
NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2,
new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>()); new ArrayList<ContainerStatus>(), new ArrayList<ContainerStatus>());
scheduler.handle(updateEvent2); scheduler.handle(updateEvent2);
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
getQueueSchedulable().getResourceUsage().getMemory()); getResourceUsage().getMemory());
} }
@Test @Test
@ -254,7 +308,7 @@ public class TestFairScheduler {
// Make sure queue 1 is allocated app capacity // Make sure queue 1 is allocated app capacity
assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). assertEquals(1024, scheduler.getQueueManager().getQueue("queue1").
getQueueSchedulable().getResourceUsage().getMemory()); getResourceUsage().getMemory());
// Now queue 2 requests likewise // Now queue 2 requests likewise
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
@ -263,7 +317,7 @@ public class TestFairScheduler {
// Make sure queue 2 is waiting with a reservation // Make sure queue 2 is waiting with a reservation
assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
getQueueSchedulable().getResourceUsage().getMemory()); getResourceUsage().getMemory());
assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
// Now another node checks in with capacity // Now another node checks in with capacity
@ -276,7 +330,7 @@ public class TestFairScheduler {
// Make sure this goes to queue 2 // Make sure this goes to queue 2
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
getQueueSchedulable().getResourceUsage().getMemory()); getResourceUsage().getMemory());
// The old reservation should still be there... // The old reservation should still be there...
assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory());
@ -294,17 +348,22 @@ public class TestFairScheduler {
AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent(
createAppAttemptId(1, 1), "default", "user1"); createAppAttemptId(1, 1), "default", "user1");
scheduler.handle(appAddedEvent); scheduler.handle(appAddedEvent);
assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size()); assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1")
assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size()); .getAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default")
.getAppSchedulables().size());
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.reinitialize(conf, resourceManager.getRMContext());
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent( AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
createAppAttemptId(2, 1), "default", "user2"); createAppAttemptId(2, 1), "default", "user2");
scheduler.handle(appAddedEvent2); scheduler.handle(appAddedEvent2);
assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size()); assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1")
assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size()); .getAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getQueue("user2").getApplications().size()); assertEquals(1, scheduler.getQueueManager().getLeafQueue("default")
.getAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user2")
.getAppSchedulables().size());
} }
@Test @Test
@ -338,18 +397,17 @@ public class TestFairScheduler {
scheduler.update(); scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size()); assertEquals(3, queues.size());
for (FSQueue p : queues) { for (FSLeafQueue p : queues) {
if (p.getName().equals("queueA")) { if (p.getName().equals("root.queueA")) {
assertEquals(1024, p.getQueueSchedulable().getFairShare().getMemory()); assertEquals(1024, p.getFairShare().getMemory());
} }
else if (p.getName().equals("queueB")) { else if (p.getName().equals("root.queueB")) {
assertEquals(2048, p.getQueueSchedulable().getFairShare().getMemory()); assertEquals(2048, p.getFairShare().getMemory());
} }
} }
} }
/** /**
@ -358,11 +416,11 @@ public class TestFairScheduler {
@Test @Test
public void testQueueDemandCalculation() throws Exception { public void testQueueDemandCalculation() throws Exception {
ApplicationAttemptId id11 = createAppAttemptId(1, 1); ApplicationAttemptId id11 = createAppAttemptId(1, 1);
scheduler.addApplication(id11, "queue1", "user1"); scheduler.addApplication(id11, "root.queue1", "user1");
ApplicationAttemptId id21 = createAppAttemptId(2, 1); ApplicationAttemptId id21 = createAppAttemptId(2, 1);
scheduler.addApplication(id21, "queue2", "user1"); scheduler.addApplication(id21, "root.queue2", "user1");
ApplicationAttemptId id22 = createAppAttemptId(2, 2); ApplicationAttemptId id22 = createAppAttemptId(2, 2);
scheduler.addApplication(id22, "queue2", "user1"); scheduler.addApplication(id22, "root.queue2", "user1");
int minReqSize = YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB; int minReqSize = YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB;
@ -388,10 +446,10 @@ public class TestFairScheduler {
scheduler.update(); scheduler.update();
assertEquals(2 * minReqSize, scheduler.getQueueManager().getQueue("queue1") assertEquals(2 * minReqSize, scheduler.getQueueManager().getQueue("root.queue1")
.getQueueSchedulable().getDemand().getMemory()); .getDemand().getMemory());
assertEquals(2 * minReqSize + 2 * minReqSize + (2 * minReqSize), scheduler assertEquals(2 * minReqSize + 2 * minReqSize + (2 * minReqSize), scheduler
.getQueueManager().getQueue("queue2").getQueueSchedulable().getDemand() .getQueueManager().getQueue("root.queue2").getDemand()
.getMemory()); .getMemory());
} }
@ -402,10 +460,11 @@ public class TestFairScheduler {
scheduler.handle(appAddedEvent1); scheduler.handle(appAddedEvent1);
// Scheduler should have two queues (the default and the one created for user1) // Scheduler should have two queues (the default and the one created for user1)
assertEquals(2, scheduler.getQueueManager().getQueues().size()); assertEquals(2, scheduler.getQueueManager().getLeafQueues().size());
// That queue should have one app // That queue should have one app
assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size()); assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1")
.getAppSchedulables().size());
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent( AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED); createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
@ -414,7 +473,8 @@ public class TestFairScheduler {
scheduler.handle(appRemovedEvent1); scheduler.handle(appRemovedEvent1);
// Queue should have no apps // Queue should have no apps
assertEquals(0, scheduler.getQueueManager().getQueue("user1").getApplications().size()); assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1")
.getAppSchedulables().size());
} }
@Test @Test
@ -466,60 +526,98 @@ public class TestFairScheduler {
QueueManager queueManager = scheduler.getQueueManager(); QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize(); queueManager.initialize();
assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(1024), assertEquals(Resources.createResource(1024),
queueManager.getMinResources("queueA")); queueManager.getMinResources("root.queueA"));
assertEquals(Resources.createResource(2048), assertEquals(Resources.createResource(2048),
queueManager.getMinResources("queueB")); queueManager.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueC")); queueManager.getMinResources("root.queueC"));
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueD")); queueManager.getMinResources("root.queueD"));
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueE")); queueManager.getMinResources("root.queueE"));
assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME)); assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueManager.getQueueMaxApps("queueA")); assertEquals(15, queueManager.getQueueMaxApps("root.queueA"));
assertEquals(15, queueManager.getQueueMaxApps("queueB")); assertEquals(15, queueManager.getQueueMaxApps("root.queueB"));
assertEquals(15, queueManager.getQueueMaxApps("queueC")); assertEquals(15, queueManager.getQueueMaxApps("root.queueC"));
assertEquals(3, queueManager.getQueueMaxApps("queueD")); assertEquals(3, queueManager.getQueueMaxApps("root.queueD"));
assertEquals(15, queueManager.getQueueMaxApps("queueE")); assertEquals(15, queueManager.getQueueMaxApps("root.queueE"));
assertEquals(10, queueManager.getUserMaxApps("user1")); assertEquals(10, queueManager.getUserMaxApps("user1"));
assertEquals(5, queueManager.getUserMaxApps("user2")); assertEquals(5, queueManager.getUserMaxApps("user2"));
// Unspecified queues should get default ACL // Unspecified queues should get default ACL
Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("queueA"); Map<QueueACL, AccessControlList> aclsA = queueManager.getQueueAcls("root.queueA");
assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE)); assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE));
assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString()); assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString());
assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS)); assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS));
assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
// Queue B ACL // Queue B ACL
Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB"); Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("root.queueB");
assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE)); assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE));
assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString()); assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString());
// Queue c ACL // Queue c ACL
Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC"); Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("root.queueC");
assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS)); assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS));
assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueManager.getMinSharePreemptionTimeout( assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME)); YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB")); assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC")); assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD")); assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE")); assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
} }
@Test
public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException,
AllocationConfigurationException, ParserConfigurationException {
Configuration conf = createConfiguration();
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
scheduler.reinitialize(conf, resourceManager.getRMContext());
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>2048</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048</minResources>");
out.println("<queue name=\"queueC\">");
out.println("<minResources>2048</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
out.println("<minResources>2048</minResources>");
out.println("</queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize();
Collection<FSLeafQueue> leafQueues = queueManager.getLeafQueues();
Assert.assertEquals(4, leafQueues.size());
Assert.assertNotNull(queueManager.getLeafQueue("queueA"));
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC"));
Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD"));
Assert.assertNotNull(queueManager.getLeafQueue("default"));
// Make sure querying for queues didn't create any new ones:
Assert.assertEquals(4, leafQueues.size());
}
@Test @Test
public void testBackwardsCompatibleAllocationFileParsing() throws Exception { public void testBackwardsCompatibleAllocationFileParsing() throws Exception {
Configuration conf = createConfiguration(); Configuration conf = createConfiguration();
@ -569,29 +667,29 @@ public class TestFairScheduler {
QueueManager queueManager = scheduler.getQueueManager(); QueueManager queueManager = scheduler.getQueueManager();
queueManager.initialize(); queueManager.initialize();
assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(1024), assertEquals(Resources.createResource(1024),
queueManager.getMinResources("queueA")); queueManager.getMinResources("root.queueA"));
assertEquals(Resources.createResource(2048), assertEquals(Resources.createResource(2048),
queueManager.getMinResources("queueB")); queueManager.getMinResources("root.queueB"));
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueC")); queueManager.getMinResources("root.queueC"));
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueD")); queueManager.getMinResources("root.queueD"));
assertEquals(Resources.createResource(0), assertEquals(Resources.createResource(0),
queueManager.getMinResources("queueE")); queueManager.getMinResources("root.queueE"));
assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME)); assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(15, queueManager.getQueueMaxApps("queueA")); assertEquals(15, queueManager.getQueueMaxApps("root.queueA"));
assertEquals(15, queueManager.getQueueMaxApps("queueB")); assertEquals(15, queueManager.getQueueMaxApps("root.queueB"));
assertEquals(15, queueManager.getQueueMaxApps("queueC")); assertEquals(15, queueManager.getQueueMaxApps("root.queueC"));
assertEquals(3, queueManager.getQueueMaxApps("queueD")); assertEquals(3, queueManager.getQueueMaxApps("root.queueD"));
assertEquals(15, queueManager.getQueueMaxApps("queueE")); assertEquals(15, queueManager.getQueueMaxApps("root.queueE"));
assertEquals(10, queueManager.getUserMaxApps("user1")); assertEquals(10, queueManager.getUserMaxApps("user1"));
assertEquals(5, queueManager.getUserMaxApps("user2")); assertEquals(5, queueManager.getUserMaxApps("user2"));
@ -603,23 +701,23 @@ public class TestFairScheduler {
assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
// Queue B ACL // Queue B ACL
Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("queueB"); Map<QueueACL, AccessControlList> aclsB = queueManager.getQueueAcls("root.queueB");
assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE)); assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE));
assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString()); assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString());
// Queue c ACL // Queue c ACL
Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("queueC"); Map<QueueACL, AccessControlList> aclsC = queueManager.getQueueAcls("root.queueC");
assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS)); assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS));
assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueManager.getMinSharePreemptionTimeout( assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME)); YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB")); assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC")); assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD")); assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE")); assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); assertEquals(300000, queueManager.getFairSharePreemptionTimeout());
} }
@ -659,25 +757,25 @@ public class TestFairScheduler {
// Queue B arrives and wants 1 * 1024 // Queue B arrives and wants 1 * 1024
createSchedulingRequest(1 * 1024, "queueB", "user1"); createSchedulingRequest(1 * 1024, "queueB", "user1");
scheduler.update(); scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size()); assertEquals(3, queues.size());
// Queue A should be above min share, B below. // Queue A should be above min share, B below.
for (FSQueue p : queues) { for (FSLeafQueue p : queues) {
if (p.getName().equals("queueA")) { if (p.getName().equals("root.queueA")) {
assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); assertEquals(false, scheduler.isStarvedForMinShare(p));
} }
else if (p.getName().equals("queueB")) { else if (p.getName().equals("root.queueB")) {
assertEquals(true, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); assertEquals(true, scheduler.isStarvedForMinShare(p));
} }
} }
// Node checks in again, should allocate for B // Node checks in again, should allocate for B
scheduler.handle(nodeEvent2); scheduler.handle(nodeEvent2);
// Now B should have min share ( = demand here) // Now B should have min share ( = demand here)
for (FSQueue p : queues) { for (FSLeafQueue p : queues) {
if (p.getName().equals("queueB")) { if (p.getName().equals("root.queueB")) {
assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); assertEquals(false, scheduler.isStarvedForMinShare(p));
} }
} }
} }
@ -718,16 +816,16 @@ public class TestFairScheduler {
// Queue B arrives and wants 1 * 1024 // Queue B arrives and wants 1 * 1024
createSchedulingRequest(1 * 1024, "queueB", "user1"); createSchedulingRequest(1 * 1024, "queueB", "user1");
scheduler.update(); scheduler.update();
Collection<FSQueue> queues = scheduler.getQueueManager().getQueues(); Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size()); assertEquals(3, queues.size());
// Queue A should be above fair share, B below. // Queue A should be above fair share, B below.
for (FSQueue p : queues) { for (FSLeafQueue p : queues) {
if (p.getName().equals("queueA")) { if (p.getName().equals("root.queueA")) {
assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); assertEquals(false, scheduler.isStarvedForFairShare(p));
} }
else if (p.getName().equals("queueB")) { else if (p.getName().equals("root.queueB")) {
assertEquals(true, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); assertEquals(true, scheduler.isStarvedForFairShare(p));
} }
} }
@ -735,9 +833,9 @@ public class TestFairScheduler {
scheduler.handle(nodeEvent2); scheduler.handle(nodeEvent2);
// B should not be starved for fair share, since entire demand is // B should not be starved for fair share, since entire demand is
// satisfied. // satisfied.
for (FSQueue p : queues) { for (FSLeafQueue p : queues) {
if (p.getName().equals("queueB")) { if (p.getName().equals("root.queueB")) {
assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); assertEquals(false, scheduler.isStarvedForFairShare(p));
} }
} }
} }
@ -845,7 +943,7 @@ public class TestFairScheduler {
// We should be able to claw back one container from A and B each. // We should be able to claw back one container from A and B each.
// Make sure it is lowest priority container. // Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueSchedulables(), scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024)); Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size());
@ -856,7 +954,7 @@ public class TestFairScheduler {
// We should be able to claw back another container from A and B each. // We should be able to claw back another container from A and B each.
// Make sure it is lowest priority container. // Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueSchedulables(), scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024)); Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
@ -866,7 +964,7 @@ public class TestFairScheduler {
assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size());
// Now A and B are below fair share, so preemption shouldn't do anything // Now A and B are below fair share, so preemption shouldn't do anything
scheduler.preemptResources(scheduler.getQueueSchedulables(), scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024)); Resources.createResource(2 * 1024));
assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size());
assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size());
@ -977,10 +1075,10 @@ public class TestFairScheduler {
scheduler.update(); scheduler.update();
FSQueueSchedulable schedC = FSLeafQueue schedC =
scheduler.getQueueManager().getQueue("queueC").getQueueSchedulable(); scheduler.getQueueManager().getLeafQueue("queueC");
FSQueueSchedulable schedD = FSLeafQueue schedD =
scheduler.getQueueManager().getQueue("queueD").getQueueSchedulable(); scheduler.getQueueManager().getLeafQueue("queueD");
assertTrue(Resources.equals( assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(schedC, clock.getTime()))); Resources.none(), scheduler.resToPreempt(schedC, clock.getTime())));

View File

@ -53,6 +53,22 @@ Hadoop MapReduce Next Generation - Fair Scheduler
capacity between the running apps. queues can also be given weights to share capacity between the running apps. queues can also be given weights to share
the cluster non-proportionally in the config file. the cluster non-proportionally in the config file.
The fair scheduler supports hierarchical queues. All queues descend from a
queue named "root". Available resources are distributed among the children
of the root queue in the typical fair scheduling fashion. Then, the children
distribute the resources assigned to them to their children in the same
fashion. Applications may only be scheduled on leaf queues. Queues can be
specified as children of other queues by placing them as sub-elements of
their parents in the fair scheduler configuration file.
A queue's name starts with the names of its parents, with periods as
separators. So a queue named "queue1" under the root named, would be
referred to as "root.queue1", and a queue named "queue2" under a queue
named "parent1" would be referred to as "root.parent1.queue2". When
referring to queues, the root part of the name is optional, so queue1 could
be referred to as just "queue1", and a queue2 could be referred to as just
"parent1.queue2".
In addition to providing fair sharing, the Fair Scheduler allows assigning In addition to providing fair sharing, the Fair Scheduler allows assigning
guaranteed minimum shares to queues, which is useful for ensuring that guaranteed minimum shares to queues, which is useful for ensuring that
certain users, groups or production applications always get sufficient certain users, groups or production applications always get sufficient
@ -163,11 +179,14 @@ Allocation file format
<?xml version="1.0"?> <?xml version="1.0"?>
<allocations> <allocations>
<queue name="sample_queue"> <queue name="sample_queue">
<minResources>100000</minResources> <minResources>10000</minResources>
<maxResources>900000</maxResources> <maxResources>90000</maxResources>
<maxRunningApps>50</maxRunningApps> <maxRunningApps>50</maxRunningApps>
<weight>2.0</weight> <weight>2.0</weight>
<schedulingMode>fair</schedulingMode> <schedulingMode>fair</schedulingMode>
<queue name="sample_sub_queue">
<minResources>5000</minResources>
</queue>
</queue> </queue>
<user name="sample_user"> <user name="sample_user">
<maxRunningApps>30</maxRunningApps> <maxRunningApps>30</maxRunningApps>