diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f2813f340a5..70ddcf114e9 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -100,6 +100,8 @@ Release 2.0.3-alpha - Unreleased YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy) + YARN-187. Add hierarchical queues to the fair scheduler. (Sandy Ryza via tomwhite) + Release 2.0.2-alpha - 2012-09-07 YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index acad730ee82..52ac0a25c8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -50,10 +50,10 @@ public class AppSchedulable extends Schedulable { private long startTime; private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private static final Log LOG = LogFactory.getLog(AppSchedulable.class); - private FSQueue queue; + private FSLeafQueue queue; private RMContainerTokenSecretManager containerTokenSecretManager; - public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSQueue queue) { + public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) { this.scheduler = scheduler; this.app = app; this.startTime = System.currentTimeMillis(); @@ -96,9 +96,6 @@ public long getStartTime() { return startTime; } - @Override - public void redistributeShare() {} - @Override public Resource getResourceUsage() { return app.getCurrentConsumption(); @@ -114,7 +111,7 @@ public Resource getMinShare() { * Get metrics reference from containing queue. */ public QueueMetrics getMetrics() { - return queue.getQueueSchedulable().getMetrics(); + return queue.getMetrics(); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java similarity index 59% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index ccac112e6bb..8b3e13420b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -22,73 +22,57 @@ import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; -import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -@Private -@Unstable -public class FSQueueSchedulable extends Schedulable implements Queue { - public static final Log LOG = LogFactory.getLog( - FSQueueSchedulable.class.getName()); +public class FSLeafQueue extends FSQueue { + private static final Log LOG = LogFactory.getLog( + FSLeafQueue.class.getName()); + + private final List appScheds = + new ArrayList(); - private FairScheduler scheduler; - private FSQueue queue; - private QueueManager queueMgr; - private List appScheds = new LinkedList(); + /** Scheduling mode for jobs inside the queue (fair or FIFO) */ + private SchedulingMode schedulingMode; + + private final FairScheduler scheduler; + private final QueueManager queueMgr; private Resource demand = Resources.createResource(0); - private QueueMetrics metrics; - private RecordFactory recordFactory = - RecordFactoryProvider.getRecordFactory(null); - + // Variables used for preemption - long lastTimeAtMinShare; - long lastTimeAtHalfFairShare; - - // Constructor for tests - protected FSQueueSchedulable(FairScheduler scheduler, FSQueue fsQueue, - QueueManager qMgr, QueueMetrics metrics, long minShare, long fairShare) { + private long lastTimeAtMinShare; + private long lastTimeAtHalfFairShare; + + public FSLeafQueue(String name, QueueManager queueMgr, FairScheduler scheduler, + FSParentQueue parent) { + super(name, queueMgr, scheduler, parent); this.scheduler = scheduler; - this.queueMgr = qMgr; - this.queue = fsQueue; - this.metrics = metrics; - this.lastTimeAtMinShare = minShare; - this.lastTimeAtHalfFairShare = fairShare; - } - - public FSQueueSchedulable(FairScheduler scheduler, FSQueue queue) { - this.scheduler = scheduler; - this.queue = queue; - this.queueMgr = scheduler.getQueueManager(); - this.metrics = QueueMetrics.forQueue(getName(), null, true, scheduler.getConf()); + this.queueMgr = queueMgr; this.lastTimeAtMinShare = scheduler.getClock().getTime(); this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); } - - public void addApp(AppSchedulable app) { - appScheds.add(app); + + public void addApp(FSSchedulerApp app) { + AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this); + app.setAppSchedulable(appSchedulable); + appScheds.add(appSchedulable); } - + + // for testing + void addAppSchedulable(AppSchedulable appSched) { + appScheds.add(appSched); + } + public void removeApp(FSSchedulerApp app) { for (Iterator it = appScheds.iterator(); it.hasNext();) { AppSchedulable appSched = it.next(); @@ -98,17 +82,47 @@ public void removeApp(FSSchedulerApp app) { } } } + + public Collection getAppSchedulables() { + return appScheds; + } + + public void setSchedulingMode(SchedulingMode mode) { + this.schedulingMode = mode; + } + + @Override + public void recomputeFairShares() { + if (schedulingMode == SchedulingMode.FAIR) { + SchedulingAlgorithms.computeFairShares(appScheds, getFairShare()); + } else { + for (AppSchedulable sched: appScheds) { + sched.setFairShare(Resources.createResource(0)); + } + } + } + + @Override + public Resource getDemand() { + return demand; + } + + @Override + public Resource getResourceUsage() { + Resource usage = Resources.createResource(0); + for (AppSchedulable app : appScheds) { + Resources.addTo(usage, app.getResourceUsage()); + } + return usage; + } - /** - * Update demand by asking apps in the queue to update - */ @Override public void updateDemand() { // Compute demand by iterating through apps in the queue // Limit demand to maxResources - Resource maxRes = queueMgr.getMaxResources(queue.getName()); + Resource maxRes = queueMgr.getMaxResources(getName()); demand = Resources.createResource(0); - for (AppSchedulable sched: appScheds) { + for (AppSchedulable sched : appScheds) { sched.updateDemand(); Resource toAdd = sched.getDemand(); if (LOG.isDebugEnabled()) { @@ -128,46 +142,12 @@ public void updateDemand() { } } - /** - * Distribute the queue's fair share among its jobs - */ - @Override - public void redistributeShare() { - if (queue.getSchedulingMode() == SchedulingMode.FAIR) { - SchedulingAlgorithms.computeFairShares(appScheds, getFairShare()); - } else { - for (AppSchedulable sched: appScheds) { - sched.setFairShare(Resources.createResource(0)); - } - } - } - - @Override - public Resource getDemand() { - return demand; - } - - @Override - public Resource getMinShare() { - return queueMgr.getMinResources(queue.getName()); - } - - @Override - public double getWeight() { - return queueMgr.getQueueWeight(queue.getName()); - } - - @Override - public long getStartTime() { - return 0; - } - @Override public Resource assignContainer(FSSchedulerNode node, boolean reserved) { LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved); // If this queue is over its limit, reject if (Resources.greaterThan(getResourceUsage(), - queueMgr.getMaxResources(queue.getName()))) { + queueMgr.getMaxResources(getName()))) { return Resources.none(); } @@ -185,15 +165,14 @@ public Resource assignContainer(FSSchedulerNode node, boolean reserved) { // Otherwise, chose app to schedule based on given policy (fair vs fifo). else { - SchedulingMode mode = queue.getSchedulingMode(); - Comparator comparator; - if (mode == SchedulingMode.FIFO) { + if (schedulingMode == SchedulingMode.FIFO) { comparator = new SchedulingAlgorithms.FifoComparator(); - } else if (mode == SchedulingMode.FAIR) { + } else if (schedulingMode == SchedulingMode.FAIR) { comparator = new SchedulingAlgorithms.FairShareComparator(); } else { - throw new RuntimeException("Unsupported queue scheduling mode " + mode); + throw new RuntimeException("Unsupported queue scheduling mode " + + schedulingMode); } Collections.sort(appScheds, comparator); @@ -203,81 +182,13 @@ public Resource assignContainer(FSSchedulerNode node, boolean reserved) { return Resources.none(); } - } @Override - public String getName() { - return queue.getName(); + public Collection getChildQueues() { + return new ArrayList(1); } - - FSQueue getQueue() { - return queue; - } - - public Collection getAppSchedulables() { - return appScheds; - } - - public long getLastTimeAtMinShare() { - return lastTimeAtMinShare; - } - - public void setLastTimeAtMinShare(long lastTimeAtMinShare) { - this.lastTimeAtMinShare = lastTimeAtMinShare; - } - - public long getLastTimeAtHalfFairShare() { - return lastTimeAtHalfFairShare; - } - - public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) { - this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare; - } - - @Override - public QueueMetrics getMetrics() { - return metrics; - } - - @Override - public Resource getResourceUsage() { - Resource usage = Resources.createResource(0); - for (AppSchedulable app : appScheds) { - Resources.addTo(usage, app.getResourceUsage()); - } - return usage; - } - - @Override - public Priority getPriority() { - Priority p = recordFactory.newRecordInstance(Priority.class); - p.setPriority(1); - return p; - } - - @Override - public Map getQueueAcls() { - Map acls = queueMgr.getQueueAcls(getName()); - return new HashMap(acls); - } - - @Override - public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) { - QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); - queueInfo.setQueueName(getQueueName()); - // TODO: we might change these queue metrics around a little bit - // to match the semantics of the fair scheduler. - queueInfo.setCapacity((float) getFairShare().getMemory() / - scheduler.getClusterCapacity().getMemory()); - queueInfo.setCapacity((float) getResourceUsage().getMemory() / - scheduler.getClusterCapacity().getMemory()); - - queueInfo.setChildQueues(new ArrayList()); - queueInfo.setQueueState(QueueState.RUNNING); - return queueInfo; - } - + @Override public List getQueueUserAclInfo(UserGroupInformation user) { QueueUserACLInfo userAclInfo = @@ -294,9 +205,20 @@ public List getQueueUserAclInfo(UserGroupInformation user) { userAclInfo.setUserAcls(operations); return Collections.singletonList(userAclInfo); } + + public long getLastTimeAtMinShare() { + return lastTimeAtMinShare; + } - @Override - public String getQueueName() { - return getName(); + public void setLastTimeAtMinShare(long lastTimeAtMinShare) { + this.lastTimeAtMinShare = lastTimeAtMinShare; + } + + public long getLastTimeAtHalfFairShare() { + return lastTimeAtHalfFairShare; + } + + public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) { + this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java new file mode 100644 index 00000000000..4ae6b362f92 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; + +public class FSParentQueue extends FSQueue { + private static final Log LOG = LogFactory.getLog( + FSParentQueue.class.getName()); + + + private final List childQueues = + new ArrayList(); + private final QueueManager queueMgr; + private Resource demand = Resources.createResource(0); + + public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler, + FSParentQueue parent) { + super(name, queueMgr, scheduler, parent); + this.queueMgr = queueMgr; + } + + public void addChildQueue(FSQueue child) { + childQueues.add(child); + } + + @Override + public void recomputeFairShares() { + SchedulingAlgorithms.computeFairShares(childQueues, getFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare()); + childQueue.recomputeFairShares(); + } + } + + @Override + public Resource getDemand() { + return demand; + } + + @Override + public Resource getResourceUsage() { + Resource usage = Resources.createResource(0); + for (FSQueue child : childQueues) { + Resources.addTo(usage, child.getResourceUsage()); + } + return usage; + } + + @Override + public void updateDemand() { + // Compute demand by iterating through apps in the queue + // Limit demand to maxResources + Resource maxRes = queueMgr.getMaxResources(getName()); + demand = Resources.createResource(0); + for (FSQueue childQueue : childQueues) { + childQueue.updateDemand(); + Resource toAdd = childQueue.getDemand(); + if (LOG.isDebugEnabled()) { + LOG.debug("Counting resource from " + childQueue.getName() + " " + + toAdd + "; Total resource consumption for " + getName() + + " now " + demand); + } + demand = Resources.add(demand, toAdd); + if (Resources.greaterThanOrEqual(demand, maxRes)) { + demand = maxRes; + break; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("The updated demand for " + getName() + " is " + demand + + "; the max is " + maxRes); + } + } + + public boolean hasAccess(QueueACL acl, UserGroupInformation user) { + synchronized (this) { + if (getQueueAcls().get(acl).isUserAllowed(user)) { + return true; + } + } + + if (parent != null) { + return parent.hasAccess(acl, user); + } + + return false; + } + + private synchronized QueueUserACLInfo getUserAclInfo( + UserGroupInformation user) { + QueueUserACLInfo userAclInfo = + recordFactory.newRecordInstance(QueueUserACLInfo.class); + List operations = new ArrayList(); + for (QueueACL operation : QueueACL.values()) { + if (hasAccess(operation, user)) { + operations.add(operation); + } + } + + userAclInfo.setQueueName(getQueueName()); + userAclInfo.setUserAcls(operations); + return userAclInfo; + } + + @Override + public synchronized List getQueueUserAclInfo( + UserGroupInformation user) { + List userAcls = new ArrayList(); + + // Add queue acls + userAcls.add(getUserAclInfo(user)); + + // Add children queue acls + for (FSQueue child : childQueues) { + userAcls.addAll(child.getQueueUserAclInfo(user)); + } + + return userAcls; + } + + @Override + public Resource assignContainer(FSSchedulerNode node, boolean reserved) { + throw new IllegalStateException( + "Parent queue should not be assigned container"); + } + + @Override + public Collection getChildQueues() { + return childQueues; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 79395b0d1d2..0a85cbc5f1a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -20,65 +20,112 @@ import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; -import org.apache.hadoop.classification.InterfaceAudience.Private; -import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; -/** - * A queue containing several applications. - */ -@Private -@Unstable -public class FSQueue { - /** Queue name. */ - private String name; - - /** Applications in this specific queue; does not include children queues' jobs. */ - private Collection applications = - new ArrayList(); - - /** Scheduling mode for jobs inside the queue (fair or FIFO) */ - private SchedulingMode schedulingMode; - - private FairScheduler scheduler; - - private FSQueueSchedulable queueSchedulable; - - public FSQueue(FairScheduler scheduler, String name) { +public abstract class FSQueue extends Schedulable implements Queue { + private final String name; + private final QueueManager queueMgr; + private final FairScheduler scheduler; + private final QueueMetrics metrics; + + protected final FSParentQueue parent; + protected final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + public FSQueue(String name, QueueManager queueMgr, + FairScheduler scheduler, FSParentQueue parent) { this.name = name; - this.queueSchedulable = new FSQueueSchedulable(scheduler, this); + this.queueMgr = queueMgr; this.scheduler = scheduler; + this.metrics = QueueMetrics.forQueue(getName(), parent, true, scheduler.getConf()); + this.parent = parent; } - - public Collection getApplications() { - return applications; - } - - public void addApp(FSSchedulerApp app) { - applications.add(app); - AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this); - app.setAppSchedulable(appSchedulable); - queueSchedulable.addApp(appSchedulable); - } - - public void removeApp(FSSchedulerApp app) { - applications.remove(app); - queueSchedulable.removeApp(app); - } - + public String getName() { return name; } - - public SchedulingMode getSchedulingMode() { - return schedulingMode; + + @Override + public String getQueueName() { + return name; + } + + @Override + public double getWeight() { + return queueMgr.getQueueWeight(getName()); + } + + @Override + public Resource getMinShare() { + return queueMgr.getMinResources(getName()); } - public void setSchedulingMode(SchedulingMode schedulingMode) { - this.schedulingMode = schedulingMode; + @Override + public long getStartTime() { + return 0; } - public FSQueueSchedulable getQueueSchedulable() { - return queueSchedulable; + @Override + public Priority getPriority() { + Priority p = recordFactory.newRecordInstance(Priority.class); + p.setPriority(1); + return p; } + + @Override + public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) { + QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); + queueInfo.setQueueName(getQueueName()); + // TODO: we might change these queue metrics around a little bit + // to match the semantics of the fair scheduler. + queueInfo.setCapacity((float) getFairShare().getMemory() / + scheduler.getClusterCapacity().getMemory()); + queueInfo.setCapacity((float) getResourceUsage().getMemory() / + scheduler.getClusterCapacity().getMemory()); + + ArrayList childQueueInfos = new ArrayList(); + if (includeChildQueues) { + Collection childQueues = getChildQueues(); + for (FSQueue child : childQueues) { + childQueueInfos.add(child.getQueueInfo(recursive, recursive)); + } + } + queueInfo.setChildQueues(childQueueInfos); + queueInfo.setQueueState(QueueState.RUNNING); + return queueInfo; + } + + @Override + public Map getQueueAcls() { + Map acls = queueMgr.getQueueAcls(getName()); + return new HashMap(acls); + } + + @Override + public QueueMetrics getMetrics() { + return metrics; + } + + /** + * Recomputes the fair shares for all queues and applications + * under this queue. + */ + public abstract void recomputeFairShares(); + + /** + * Gets the children of this queue, if any. + */ + public abstract Collection getChildQueues(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index bf38e7d5238..f0b9949dc85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -75,6 +76,25 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +/** + * A scheduler that schedules resources between a set of queues. The scheduler + * keeps track of the resources used by each queue, and attempts to maintain + * fairness by scheduling tasks at queues whose allocations are farthest below + * an ideal fair distribution. + * + * The fair scheduler supports hierarchical queues. All queues descend from a + * queue named "root". Available resources are distributed among the children + * of the root queue in the typical fair scheduling fashion. Then, the children + * distribute the resources assigned to them to their children in the same + * fashion. Applications may only be scheduled on leaf queues. Queues can be + * specified as children of other queues by placing them as sub-elements of their + * parents in the fair scheduler configuration file. + * + * A queue's name starts with the names of its parents, with periods as + * separators. So a queue named "queue1" under the root named, would be + * referred to as "root.queue1", and a queue named "queue2" under a queue + * named "parent1" would be referred to as "root.parent1.queue2". + */ @LimitedPrivate("yarn") @Unstable @SuppressWarnings("unchecked") @@ -105,23 +125,22 @@ public class FairScheduler implements ResourceScheduler { // Aggregate metrics QueueMetrics rootMetrics; - //Time when we last updated preemption vars + // Time when we last updated preemption vars protected long lastPreemptionUpdateTime; - //Time we last ran preemptTasksIfNecessary + // Time we last ran preemptTasksIfNecessary private long lastPreemptCheckTime; - // This stores per-application scheduling information, indexed by // attempt ID's for fast lookup. - protected Map applications - = new HashMap(); + protected Map applications = + new HashMap(); // Nodes in the cluster, indexed by NodeId - private Map nodes = + private Map nodes = new ConcurrentHashMap(); // Aggregate capacity of the cluster - private Resource clusterCapacity = + private Resource clusterCapacity = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class); // How often tasks are preempted (must be longer than a couple @@ -131,10 +150,11 @@ public class FairScheduler implements ResourceScheduler { protected boolean preemptionEnabled; protected boolean sizeBasedWeight; // Give larger weights to larger jobs protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster - protected double nodeLocalityThreshold; // Cluster threshold for node locality - protected double rackLocalityThreshold; // Cluster threshold for rack locality - private FairSchedulerEventLog eventLog; // Machine-readable event log - protected boolean assignMultiple; // Allocate multiple containers per heartbeat + protected double nodeLocalityThreshold; // Cluster threshold for node locality + protected double rackLocalityThreshold; // Cluster threshold for rack locality + private FairSchedulerEventLog eventLog; // Machine-readable event log + protected boolean assignMultiple; // Allocate multiple containers per + // heartbeat protected int maxAssign; // Max containers to assign per heartbeat public FairScheduler() { @@ -150,16 +170,8 @@ public QueueManager getQueueManager() { return queueMgr; } - public List getQueueSchedulables() { - List scheds = new ArrayList(); - for (FSQueue queue: queueMgr.getQueues()) { - scheds.add(queue.getQueueSchedulable()); - } - return scheds; - } - private RMContainer getRMContainer(ContainerId containerId) { - FSSchedulerApp application = + FSSchedulerApp application = applications.get(containerId.getApplicationAttemptId()); return (application == null) ? null : application.getRMContainer(containerId); } @@ -183,34 +195,24 @@ public void run() { } /** - * Recompute the internal variables used by the scheduler - per-job weights, - * fair shares, deficits, minimum slot allocations, and amount of used and - * required resources per job. - */ + * Recompute the internal variables used by the scheduler - per-job weights, + * fair shares, deficits, minimum slot allocations, and amount of used and + * required resources per job. + */ protected synchronized void update() { queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file updateRunnability(); // Set job runnability based on user/queue limits updatePreemptionVariables(); // Determine if any queues merit preemption - // Update demands of apps and queues - for (FSQueue queue: queueMgr.getQueues()) { - queue.getQueueSchedulable().updateDemand(); - } + FSQueue rootQueue = queueMgr.getRootQueue(); - // Compute fair shares based on updated demands - List queueScheds = getQueueSchedulables(); - SchedulingAlgorithms.computeFairShares( - queueScheds, clusterCapacity); + // Recursively update demands for all queues + rootQueue.updateDemand(); - // Update queue metrics for this queue - for (FSQueueSchedulable sched : queueScheds) { - sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare()); - } - - // Use the computed shares to assign shares within each queue - for (FSQueue queue: queueMgr.getQueues()) { - queue.getQueueSchedulable().redistributeShare(); - } + rootQueue.setFairShare(clusterCapacity); + // Recursively compute fair shares for all queues + // and update metrics + rootQueue.recomputeFairShares(); // Update recorded capacity of root queue (child queues are updated // when fair share is calculated). @@ -225,7 +227,7 @@ protected synchronized void update() { private void updatePreemptionVariables() { long now = clock.getTime(); lastPreemptionUpdateTime = now; - for (FSQueueSchedulable sched: getQueueSchedulables()) { + for (FSLeafQueue sched : queueMgr.getLeafQueues()) { if (!isStarvedForMinShare(sched)) { sched.setLastTimeAtMinShare(now); } @@ -238,16 +240,16 @@ private void updatePreemptionVariables() { /** * Is a queue below its min share for the given task type? */ - boolean isStarvedForMinShare(FSQueueSchedulable sched) { + boolean isStarvedForMinShare(FSLeafQueue sched) { Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand()); return Resources.lessThan(sched.getResourceUsage(), desiredShare); } /** - * Is a queue being starved for fair share for the given task type? - * This is defined as being below half its fair share. + * Is a queue being starved for fair share for the given task type? This is + * defined as being below half its fair share. */ - boolean isStarvedForFairShare(FSQueueSchedulable sched) { + boolean isStarvedForFairShare(FSLeafQueue sched) { Resource desiredFairShare = Resources.max( Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); return Resources.lessThan(sched.getResourceUsage(), desiredFairShare); @@ -255,10 +257,10 @@ boolean isStarvedForFairShare(FSQueueSchedulable sched) { /** * Check for queues that need tasks preempted, either because they have been - * below their guaranteed share for minSharePreemptionTimeout or they - * have been below half their fair share for the fairSharePreemptionTimeout. - * If such queues exist, compute how many tasks of each type need to be - * preempted and then select the right ones using preemptTasks. + * below their guaranteed share for minSharePreemptionTimeout or they have + * been below half their fair share for the fairSharePreemptionTimeout. If + * such queues exist, compute how many tasks of each type need to be preempted + * and then select the right ones using preemptTasks. */ protected synchronized void preemptTasksIfNecessary() { if (!preemptionEnabled) { @@ -273,35 +275,37 @@ protected synchronized void preemptTasksIfNecessary() { Resource resToPreempt = Resources.none(); - for (FSQueueSchedulable sched: getQueueSchedulables()) { + for (FSLeafQueue sched : queueMgr.getLeafQueues()) { resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime)); } if (Resources.greaterThan(resToPreempt, Resources.none())) { - preemptResources(getQueueSchedulables(), resToPreempt); + preemptResources(queueMgr.getLeafQueues(), resToPreempt); } } /** - * Preempt a quantity of resources from a list of QueueSchedulables. - * The policy for this is to pick apps from queues that are over their fair - * share, but make sure that no queue is placed below its fair share in the - * process. We further prioritize preemption by choosing containers with - * lowest priority to preempt. + * Preempt a quantity of resources from a list of QueueSchedulables. The + * policy for this is to pick apps from queues that are over their fair share, + * but make sure that no queue is placed below its fair share in the process. + * We further prioritize preemption by choosing containers with lowest + * priority to preempt. */ - protected void preemptResources(List scheds, Resource toPreempt) { + protected void preemptResources(Collection scheds, + Resource toPreempt) { if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) { return; } Map apps = new HashMap(); - Map queues = new HashMap(); + Map queues = + new HashMap(); // Collect running containers from over-scheduled queues List runningContainers = new ArrayList(); - for (FSQueueSchedulable sched: scheds) { + for (FSLeafQueue sched : scheds) { if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) { - for (AppSchedulable as: sched.getAppSchedulables()) { + for (AppSchedulable as : sched.getAppSchedulables()) { for (RMContainer c : as.getApp().getLiveContainers()) { runningContainers.add(c); apps.put(c, as.getApp()); @@ -321,12 +325,12 @@ public int compare(RMContainer c1, RMContainer c2) { // Scan down the sorted list of task statuses until we've killed enough // tasks, making sure we don't kill too many from any queue - for (RMContainer container: runningContainers) { - FSQueueSchedulable sched = queues.get(container); + for (RMContainer container : runningContainers) { + FSLeafQueue sched = queues.get(container); if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) { LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + - ") from queue " + sched.getQueue().getName()); + ") from queue " + sched.getName()); ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus( container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER); @@ -348,12 +352,12 @@ public int compare(RMContainer c1, RMContainer c2) { * If the queue has been below its min share for at least its preemption * timeout, it should preempt the difference between its current share and * this min share. If it has been below half its fair share for at least the - * fairSharePreemptionTimeout, it should preempt enough tasks to get up to - * its full fair share. If both conditions hold, we preempt the max of the - * two amounts (this shouldn't happen unless someone sets the timeouts to - * be identical for some reason). + * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its + * full fair share. If both conditions hold, we preempt the max of the two + * amounts (this shouldn't happen unless someone sets the timeouts to be + * identical for some reason). */ - protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) { + protected Resource resToPreempt(FSLeafQueue sched, long curTime) { String queue = sched.getName(); long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue); long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout(); @@ -362,7 +366,7 @@ protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) { if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { Resource target = Resources.min(sched.getMinShare(), sched.getDemand()); resDueToMinShare = Resources.max(Resources.none(), - Resources.subtract(target, sched.getResourceUsage())); + Resources.subtract(target, sched.getResourceUsage())); } if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { Resource target = Resources.min(sched.getFairShare(), sched.getDemand()); @@ -380,15 +384,15 @@ protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) { } /** - * This updates the runnability of all apps based on whether or not - * any users/queues have exceeded their capacity. + * This updates the runnability of all apps based on whether or not any + * users/queues have exceeded their capacity. */ private void updateRunnability() { List apps = new ArrayList(); // Start by marking everything as not runnable - for (FSQueue p: queueMgr.getQueues()) { - for (AppSchedulable a: p.getQueueSchedulable().getAppSchedulables()) { + for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) { + for (AppSchedulable a : leafQueue.getAppSchedulables()) { a.setRunnable(false); apps.add(a); } @@ -400,7 +404,7 @@ private void updateRunnability() { Map userApps = new HashMap(); Map queueApps = new HashMap(); - for (AppSchedulable app: apps) { + for (AppSchedulable app : apps) { String user = app.getApp().getUser(); String queue = app.getApp().getQueueName(); int userCount = userApps.containsKey(user) ? userApps.get(user) : 0; @@ -473,22 +477,25 @@ public FairSchedulerEventLog getEventLog() { } /** - * Add a new application to the scheduler, with a given id, queue name, - * and user. This will accept a new app even if the user or queue is above + * Add a new application to the scheduler, with a given id, queue name, and + * user. This will accept a new app even if the user or queue is above * configured limits, but the app will not be marked as runnable. */ - protected synchronized void - addApplication(ApplicationAttemptId applicationAttemptId, - String queueName, String user) { + protected synchronized void addApplication( + ApplicationAttemptId applicationAttemptId, String queueName, String user) { - FSQueue queue = queueMgr.getQueue(queueName); + FSLeafQueue queue = queueMgr.getLeafQueue(queueName); + if (queue == null) { + // queue is not an existing or createable leaf queue + queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); + } FSSchedulerApp schedulerApp = new FSSchedulerApp(applicationAttemptId, user, - queue.getQueueSchedulable(), new ActiveUsersManager(getRootQueueMetrics()), + queue, new ActiveUsersManager(getRootQueueMetrics()), rmContext); - - // Inforce ACLs + + // Enforce ACLs UserGroupInformation userUgi; try { userUgi = UserGroupInformation.getCurrentUser(); @@ -497,8 +504,8 @@ public FairSchedulerEventLog getEventLog() { return; } - List info = queue.getQueueSchedulable().getQueueUserAclInfo( - userUgi); // Always a signleton list + // Always a singleton list + List info = queue.getQueueUserAclInfo(userUgi); if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) { LOG.info("User " + userUgi.getUserName() + " cannot submit" + " applications to queue " + queue.getName()); @@ -506,14 +513,13 @@ public FairSchedulerEventLog getEventLog() { } queue.addApp(schedulerApp); - queue.getQueueSchedulable().getMetrics().submitApp(user, - applicationAttemptId.getAttemptId()); + queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId()); rootMetrics.submitApp(user, applicationAttemptId.getAttemptId()); applications.put(applicationAttemptId, schedulerApp); LOG.info("Application Submission: " + applicationAttemptId + - ", user: " + user + + ", user: "+ user + ", currently active: " + applications.size()); rmContext.getDispatcher().getEventHandler().handle( @@ -540,10 +546,10 @@ private synchronized void removeApplication( SchedulerUtils.createAbnormalContainerStatus( rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION), - RMContainerEventType.KILL); + RMContainerEventType.KILL); } - // Release all reserved containers + // Release all reserved containers for (RMContainer rmContainer : application.getReservedContainers()) { completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( @@ -556,7 +562,8 @@ private synchronized void removeApplication( application.stop(rmAppAttemptFinalState); // Inform the queue - FSQueue queue = queueMgr.getQueue(application.getQueue().getQueueName()); + FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue() + .getQueueName()); queue.removeApp(application); // Remove from our data-structure @@ -658,11 +665,11 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, for (ContainerId releasedContainerId : release) { RMContainer rmContainer = getRMContainer(releasedContainerId); if (rmContainer == null) { - RMAuditLogger.logFailure(application.getUser(), - AuditConstants.RELEASE_CONTAINER, - "Unauthorized access or invalid container", "FairScheduler", - "Trying to release container not owned by app or with invalid id", - application.getApplicationId(), releasedContainerId); + RMAuditLogger.logFailure(application.getUser(), + AuditConstants.RELEASE_CONTAINER, + "Unauthorized access or invalid container", "FairScheduler", + "Trying to release container not owned by app or with invalid id", + application.getApplicationId(), releasedContainerId); } completedContainer(rmContainer, SchedulerUtils.createAbnormalContainerStatus( @@ -675,8 +682,8 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, if (!ask.isEmpty()) { if (LOG.isDebugEnabled()) { LOG.debug("allocate: pre-update" + - " applicationAttemptId=" + appAttemptId + - " application=" + application.getApplicationId()); + " applicationAttemptId=" + appAttemptId + + " application=" + application.getApplicationId()); } application.showRequests(); @@ -689,19 +696,17 @@ public Allocation allocate(ApplicationAttemptId appAttemptId, if (LOG.isDebugEnabled()) { LOG.debug("allocate:" + - " applicationAttemptId=" + appAttemptId + - " #ask=" + ask.size()); + " applicationAttemptId=" + appAttemptId + + " #ask=" + ask.size()); } - return new Allocation( - application.pullNewlyAllocatedContainers(), + return new Allocation(application.pullNewlyAllocatedContainers(), application.getHeadroom()); } } /** - * Process a container which has launched on a node, as reported by the - * node. + * Process a container which has launched on a node, as reported by the node. */ private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) { // Get the application for the finished container @@ -757,20 +762,20 @@ private synchronized void nodeUpdate(RMNode nm, LOG.info("Trying to fulfill reservation for application " + reservedApplication.getApplicationId() + " on node: " + nm); - FSQueue queue = queueMgr.getQueue(reservedApplication.getQueueName()); - queue.getQueueSchedulable().assignContainer(node, true); + FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName()); + queue.assignContainer(node, true); } - // Otherwise, schedule at queue which is furthest below fair share else { int assignedContainers = 0; while (true) { // At most one task is scheduled each iteration of this loop - List scheds = getQueueSchedulables(); + List scheds = new ArrayList( + queueMgr.getLeafQueues()); Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator()); boolean assignedContainer = false; - for (FSQueueSchedulable sched : scheds) { + for (FSLeafQueue sched : scheds) { Resource assigned = sched.assignContainer(node, false); if (Resources.greaterThan(assigned, Resources.none())) { eventLog.log("ASSIGN", nm.getHostName(), assigned); @@ -813,7 +818,7 @@ public QueueMetrics getRootQueueMetrics() { @Override public void handle(SchedulerEvent event) { - switch(event.getType()) { + switch (event.getType()) { case NODE_ADDED: if (!(event instanceof NodeAddedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); @@ -832,8 +837,7 @@ public void handle(SchedulerEvent event) { if (!(event instanceof NodeUpdateSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } - NodeUpdateSchedulerEvent nodeUpdatedEvent = - (NodeUpdateSchedulerEvent)event; + NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; nodeUpdate(nodeUpdatedEvent.getRMNode(), nodeUpdatedEvent.getNewlyLaunchedContainers(), nodeUpdatedEvent.getCompletedContainers()); @@ -842,7 +846,7 @@ public void handle(SchedulerEvent event) { if (!(event instanceof AppAddedSchedulerEvent)) { throw new RuntimeException("Unexpected event type: " + event); } - AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event; + AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event; String queue = appAddedEvent.getQueue(); // Potentially set queue to username if configured to do so @@ -867,7 +871,7 @@ public void handle(SchedulerEvent event) { throw new RuntimeException("Unexpected event type: " + event); } ContainerExpiredSchedulerEvent containerExpiredEvent = - (ContainerExpiredSchedulerEvent) event; + (ContainerExpiredSchedulerEvent)event; ContainerId containerId = containerExpiredEvent.getContainerId(); completedContainer(getRMContainer(containerId), SchedulerUtils.createAbnormalContainerStatus( @@ -886,8 +890,8 @@ public void recover(RMState state) throws Exception { } @Override - public synchronized void - reinitialize(Configuration conf, RMContext rmContext) throws IOException { + public synchronized void reinitialize(Configuration conf, RMContext rmContext) + throws IOException { if (!initialized) { this.conf = new FairSchedulerConfiguration(conf); rootMetrics = QueueMetrics.forQueue("root", null, true, conf); @@ -909,11 +913,10 @@ public void recover(RMState state) throws Exception { try { queueMgr.initialize(); - } - catch (Exception e) { + } catch (Exception e) { throw new IOException("Failed to start FairScheduler", e); } - + Thread updateThread = new Thread(new UpdateThread()); updateThread.setName("FairSchedulerUpdateThread"); updateThread.setDaemon(true); @@ -925,10 +928,9 @@ public void recover(RMState state) throws Exception { rackLocalityThreshold = this.conf.getLocalityThresholdRack(); preemptionEnabled = this.conf.getPreemptionEnabled(); try { - queueMgr.reloadAllocs(); + queueMgr.reloadAllocs(); - } - catch (Exception e) { + } catch (Exception e) { throw new IOException("Failed to initialize FairScheduler", e); } } @@ -940,8 +942,8 @@ public QueueInfo getQueueInfo(String queueName, boolean includeChildQueues, if (!queueMgr.exists(queueName)) { return null; } - return queueMgr.getQueue(queueName).getQueueSchedulable().getQueueInfo( - includeChildQueues, recursive); + return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues, + recursive); } @Override @@ -953,12 +955,7 @@ public List getQueueUserAclInfo() { return new ArrayList(); } - List userAcls = new ArrayList(); - - for (FSQueue queue : queueMgr.getQueues()) { - userAcls.addAll(queue.getQueueSchedulable().getQueueUserAclInfo(user)); - } - return userAcls; + return queueMgr.getRootQueue().getQueueUserAclInfo(user); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 2da306e2dce..5da1d4fd5bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; @@ -52,6 +53,7 @@ /** * Maintains a list of queues as well as scheduling parameters for each queue, * such as guaranteed share allocations, from the fair scheduler config file. + * */ @Private @Unstable @@ -59,6 +61,8 @@ public class QueueManager { public static final Log LOG = LogFactory.getLog( QueueManager.class.getName()); + public static final String ROOT_QUEUE = "root"; + /** Time to wait between checks of the allocation file */ public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000; @@ -76,7 +80,10 @@ public class QueueManager { // used) or a String to specify an absolute path (if // mapred.fairscheduler.allocation.file is used). - private Map queues = new HashMap(); + private final Collection leafQueues = + new CopyOnWriteArrayList(); + private final Map queues = new HashMap(); + private FSParentQueue rootQueue; private volatile QueueManagerInfo info = new QueueManagerInfo(); @@ -87,10 +94,17 @@ public class QueueManager { public QueueManager(FairScheduler scheduler) { this.scheduler = scheduler; } + + public FSParentQueue getRootQueue() { + return rootQueue; + } public void initialize() throws IOException, SAXException, AllocationConfigurationException, ParserConfigurationException { FairSchedulerConfiguration conf = scheduler.getConf(); + rootQueue = new FSParentQueue("root", this, scheduler, null); + queues.put(rootQueue.getName(), rootQueue); + this.allocFile = conf.getAllocationFile(); if (allocFile == null) { // No allocation file specified in jobconf. Use the default allocation @@ -106,21 +120,106 @@ public void initialize() throws IOException, SAXException, lastSuccessfulReload = scheduler.getClock().getTime(); lastReloadAttempt = scheduler.getClock().getTime(); // Create the default queue - getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); + getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME); } - + /** - * Get a queue by name, creating it if necessary + * Get a queue by name, creating it if necessary. If the queue + * is not or can not be a leaf queue, i.e. it already exists as a parent queue, + * or one of the parents in its name is already a leaf queue, null is returned. + * + * The root part of the name is optional, so a queue underneath the root + * named "queue1" could be referred to as just "queue1", and a queue named + * "queue2" underneath a parent named "parent1" that is underneath the root + * could be referred to as just "parent1.queue2". */ - public FSQueue getQueue(String name) { + public FSLeafQueue getLeafQueue(String name) { + if (!name.startsWith(ROOT_QUEUE + ".")) { + name = ROOT_QUEUE + "." + name; + } synchronized (queues) { FSQueue queue = queues.get(name); if (queue == null) { - queue = new FSQueue(scheduler, name); - queue.setSchedulingMode(info.defaultSchedulingMode); - queues.put(name, queue); + FSLeafQueue leafQueue = createLeafQueue(name); + if (leafQueue == null) { + return null; + } + leafQueue.setSchedulingMode(info.defaultSchedulingMode); + queue = leafQueue; + } else if (queue instanceof FSParentQueue) { + return null; } - return queue; + return (FSLeafQueue)queue; + } + } + + /** + * Creates a leaf queue and places it in the tree. Creates any + * parents that don't already exist. + * + * @return + * the created queue, if successful. null if not allowed (one of the parent + * queues in the queue name is already a leaf queue) + */ + private FSLeafQueue createLeafQueue(String name) { + List newQueueNames = new ArrayList(); + newQueueNames.add(name); + int sepIndex = name.length(); + FSParentQueue parent = null; + + // Move up the queue tree until we reach one that exists. + while (sepIndex != -1) { + sepIndex = name.lastIndexOf('.', sepIndex-1); + FSQueue queue; + String curName = null; + curName = name.substring(0, sepIndex); + queue = queues.get(curName); + + if (queue == null) { + newQueueNames.add(curName); + } else { + if (queue instanceof FSParentQueue) { + parent = (FSParentQueue)queue; + break; + } else { + return null; + } + } + } + + // At this point, parent refers to the deepest existing parent of the + // queue to create. + // Now that we know everything worked out, make all the queues + // and add them to the map. + FSLeafQueue leafQueue = null; + for (int i = newQueueNames.size()-1; i >= 0; i--) { + String queueName = newQueueNames.get(i); + if (i == 0) { + // First name added was the leaf queue + leafQueue = new FSLeafQueue(name, this, scheduler, parent); + parent.addChildQueue(leafQueue); + queues.put(leafQueue.getName(), leafQueue); + leafQueues.add(leafQueue); + } else { + FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent); + parent.addChildQueue(newParent); + queues.put(newParent.getName(), newParent); + parent = newParent; + } + } + + return leafQueue; + } + + /** + * Gets a queue by name. + */ + public FSQueue getQueue(String name) { + if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) { + name = ROOT_QUEUE + "." + name; + } + synchronized (queues) { + return queues.get(name); } } @@ -136,8 +235,8 @@ public boolean exists(String name) { /** * Get the queue for a given AppSchedulable. */ - public FSQueue getQueueForApp(AppSchedulable app) { - return getQueue(app.getApp().getQueueName()); + public FSLeafQueue getQueueForApp(AppSchedulable app) { + return getLeafQueue(app.getApp().getQueueName()); } /** @@ -237,54 +336,9 @@ public void reloadAllocs() throws IOException, ParserConfigurationException, Element element = (Element)node; if ("queue".equals(element.getTagName()) || "pool".equals(element.getTagName())) { - String queueName = element.getAttribute("name"); - Map acls = - new HashMap(); - queueNamesInAllocFile.add(queueName); - NodeList fields = element.getChildNodes(); - for (int j = 0; j < fields.getLength(); j++) { - Node fieldNode = fields.item(j); - if (!(fieldNode instanceof Element)) - continue; - Element field = (Element) fieldNode; - if ("minResources".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - minQueueResources.put(queueName, Resources.createResource(val)); - } else if ("maxResources".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - maxQueueResources.put(queueName, Resources.createResource(val)); - } else if ("maxRunningApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - queueMaxApps.put(queueName, val); - } else if ("weight".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - double val = Double.parseDouble(text); - queueWeights.put(queueName, val); - } else if ("minSharePreemptionTimeout".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - minSharePreemptionTimeouts.put(queueName, val); - } else if ("schedulingMode".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - queueModes.put(queueName, parseSchedulingMode(text)); - } else if ("aclSubmitApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text)); - } else if ("aclAdministerApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); - } - } - queueAcls.put(queueName, acls); - if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName) - && Resources.lessThan(maxQueueResources.get(queueName), - minQueueResources.get(queueName))) { - LOG.warn(String.format("Queue %s has max resources %d less than min resources %d", - queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName))); - } + loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps, + userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts, + queueAcls, queueNamesInAllocFile); } else if ("user".equals(element.getTagName())) { String userName = element.getAttribute("name"); NodeList fields = element.getChildNodes(); @@ -331,7 +385,7 @@ else if ("defaultQueueSchedulingMode".equals(element.getTagName())) { queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout); for (String name: queueNamesInAllocFile) { - FSQueue queue = getQueue(name); + FSLeafQueue queue = getLeafQueue(name); if (queueModes.containsKey(name)) { queue.setSchedulingMode(queueModes.get(name)); } else { @@ -340,6 +394,75 @@ else if ("defaultQueueSchedulingMode".equals(element.getTagName())) { } } } + + /** + * Loads a queue from a queue element in the configuration file + */ + private void loadQueue(String parentName, Element element, Map minQueueResources, + Map maxQueueResources, Map queueMaxApps, + Map userMaxApps, Map queueWeights, + Map queueModes, Map minSharePreemptionTimeouts, + Map> queueAcls, List queueNamesInAllocFile) + throws AllocationConfigurationException { + String queueName = parentName + "." + element.getAttribute("name"); + Map acls = + new HashMap(); + NodeList fields = element.getChildNodes(); + boolean isLeaf = true; + + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) + continue; + Element field = (Element) fieldNode; + if ("minResources".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + minQueueResources.put(queueName, Resources.createResource(val)); + } else if ("maxResources".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + maxQueueResources.put(queueName, Resources.createResource(val)); + } else if ("maxRunningApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + int val = Integer.parseInt(text); + queueMaxApps.put(queueName, val); + } else if ("weight".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + double val = Double.parseDouble(text); + queueWeights.put(queueName, val); + } else if ("minSharePreemptionTimeout".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + long val = Long.parseLong(text) * 1000L; + minSharePreemptionTimeouts.put(queueName, val); + } else if ("schedulingMode".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + queueModes.put(queueName, parseSchedulingMode(text)); + } else if ("aclSubmitApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text)); + } else if ("aclAdministerApps".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); + } else if ("queue".endsWith(field.getTagName()) || + "pool".equals(field.getTagName())) { + loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, + userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts, + queueAcls, queueNamesInAllocFile); + isLeaf = false; + } + } + if (isLeaf) { + queueNamesInAllocFile.add(queueName); + } + queueAcls.put(queueName, acls); + if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName) + && Resources.lessThan(maxQueueResources.get(queueName), + minQueueResources.get(queueName))) { + LOG.warn(String.format("Queue %s has max resources %d less than min resources %d", + queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName))); + } + } private SchedulingMode parseSchedulingMode(String text) throws AllocationConfigurationException { @@ -384,9 +507,9 @@ public Resource getMaxResources(String queueName) { /** * Get a collection of all queues */ - public Collection getQueues() { + public Collection getLeafQueues() { synchronized (queues) { - return new ArrayList(queues.values()); + return leafQueues; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index c7f111aa2e5..cbcbd46f5f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -91,12 +91,6 @@ abstract class Schedulable { /** Refresh the Schedulable's demand and those of its children if any. */ public abstract void updateDemand(); - /** - * Distribute the fair share assigned to this Schedulable among its - * children (used in queues where the internal scheduler is fair sharing). - */ - public abstract void redistributeShare(); - /** * Assign a container on this node if possible, and return the amount of * resources assigned. If {@code reserved} is true, it means a reservation diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java index 0591683bd30..4fe19ca13d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java @@ -23,7 +23,7 @@ import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; public class FairSchedulerInfo { @@ -32,9 +32,9 @@ public class FairSchedulerInfo { public FairSchedulerInfo(FairScheduler fs) { scheduler = fs; - Collection queues = fs.getQueueManager().getQueues(); + Collection queues = fs.getQueueManager().getLeafQueues(); queueInfos = new ArrayList(); - for (FSQueue queue : queues) { + for (FSLeafQueue queue : queues) { queueInfos.add(new FairSchedulerQueueInfo(queue, fs)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index ab224087fee..35749427d00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -22,9 +22,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueSchedulable; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager; @@ -49,17 +48,16 @@ public class FairSchedulerQueueInfo { private String queueName; - public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { - Collection apps = queue.getApplications(); - for (FSSchedulerApp app : apps) { - if (app.isPending()) { + public FairSchedulerQueueInfo(FSLeafQueue queue, FairScheduler scheduler) { + Collection apps = queue.getAppSchedulables(); + for (AppSchedulable app : apps) { + if (app.getApp().isPending()) { numPendingApps++; } else { numActiveApps++; } } - FSQueueSchedulable schedulable = queue.getQueueSchedulable(); QueueManager manager = scheduler.getQueueManager(); queueName = queue.getName(); @@ -67,11 +65,11 @@ public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) { Resource clusterMax = scheduler.getClusterCapacity(); clusterMaxMem = clusterMax.getMemory(); - usedResources = schedulable.getResourceUsage(); + usedResources = queue.getResourceUsage(); fractionUsed = (float)usedResources.getMemory() / clusterMaxMem; - fairShare = schedulable.getFairShare().getMemory(); - minResources = schedulable.getMinShare(); + fairShare = queue.getFairShare().getMemory(); + minResources = queue.getMinShare(); minShare = minResources.getMemory(); maxResources = scheduler.getQueueManager().getMaxResources(queueName); if (maxResources.getMemory() > clusterMaxMem) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index 11f96f48c50..e75b62dd890 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -107,9 +107,6 @@ public Resource getMinShare() { return minShare; } - @Override - public void redistributeShare() {} - @Override public void updateDemand() {} } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java new file mode 100644 index 00000000000..cbad1564ffb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestFSLeafQueue { + private FSLeafQueue schedulable = null; + private Resource maxResource = Resources.createResource(10); + + @Before + public void setup() throws IOException { + FairScheduler scheduler = new FairScheduler(); + Configuration conf = createConfiguration(); + // All tests assume only one assignment per node update + conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); + RMStateStore store = StoreFactory.getStore(conf); + ResourceManager resourceManager = new ResourceManager(store); + resourceManager.init(conf); + ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + String queueName = "root.queue1"; + QueueManager mockMgr = mock(QueueManager.class); + when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource); + + schedulable = new FSLeafQueue(queueName, mockMgr, scheduler, null); + } + + @Test + public void testUpdateDemand() { + AppSchedulable app = mock(AppSchedulable.class); + Mockito.when(app.getDemand()).thenReturn(maxResource); + + schedulable.addAppSchedulable(app); + schedulable.addAppSchedulable(app); + + schedulable.updateDemand(); + + assertTrue("Demand is greater than max allowed ", + Resources.equals(schedulable.getDemand(), maxResource)); + } + + private Configuration createConfiguration() { + Configuration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + return conf; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSQueueSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSQueueSchedulable.java deleted file mode 100644 index 0fc7479d4d4..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSQueueSchedulable.java +++ /dev/null @@ -1,42 +0,0 @@ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; - -import static org.junit.Assert.assertTrue; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -public class TestFSQueueSchedulable { - private FSQueueSchedulable schedulable = null; - private Resource maxResource = Resources.createResource(10); - - @Before - public void setup() { - String queueName = "testFSQueue"; - FSQueue mockQueue = mock(FSQueue.class); - when(mockQueue.getName()).thenReturn(queueName); - - QueueManager mockMgr = mock(QueueManager.class); - when(mockMgr.getMaxResources(queueName)).thenReturn(maxResource); - - schedulable = new FSQueueSchedulable(null, mockQueue, mockMgr, null, 0, 0); - } - - @Test - public void testUpdateDemand() { - AppSchedulable app = mock(AppSchedulable.class); - Mockito.when(app.getDemand()).thenReturn(maxResource); - - schedulable.addApp(app); - schedulable.addApp(app); - - schedulable.updateDemand(); - - assertTrue("Demand is greater than max allowed ", - Resources.equals(schedulable.getDemand(), maxResource)); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 3d99fc89bcc..56d247ff3bb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -31,6 +31,10 @@ import java.util.List; import java.util.Map; +import javax.xml.parsers.ParserConfigurationException; + +import junit.framework.Assert; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.Clock; @@ -61,6 +65,7 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.xml.sax.SAXException; public class TestFairScheduler { @@ -195,15 +200,64 @@ public void testSimpleFairShareCalculation() { scheduler.update(); - Collection queues = scheduler.getQueueManager().getQueues(); + Collection queues = scheduler.getQueueManager().getLeafQueues(); assertEquals(3, queues.size()); - for (FSQueue p : queues) { - if (p.getName() != "default") { - assertEquals(5120, p.getQueueSchedulable().getFairShare().getMemory()); + for (FSLeafQueue p : queues) { + if (!p.getName().equals("root.default")) { + assertEquals(5120, p.getFairShare().getMemory()); } } } + + @Test + public void testSimpleHierarchicalFairShareCalculation() { + // Add one big node (only care about aggregate capacity) + int capacity = 10 * 24; + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity)); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Have two queues which want entire cluster capacity + createSchedulingRequest(10 * 1024, "queue1", "user1"); + createSchedulingRequest(10 * 1024, "parent.queue2", "user1"); + createSchedulingRequest(10 * 1024, "parent.queue3", "user1"); + + scheduler.update(); + + QueueManager queueManager = scheduler.getQueueManager(); + Collection queues = queueManager.getLeafQueues(); + assertEquals(4, queues.size()); + + FSLeafQueue queue1 = queueManager.getLeafQueue("queue1"); + FSLeafQueue queue2 = queueManager.getLeafQueue("parent.queue2"); + FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3"); + assertEquals(capacity / 2, queue1.getFairShare().getMemory()); + assertEquals(capacity / 4, queue2.getFairShare().getMemory()); + assertEquals(capacity / 4, queue3.getFairShare().getMemory()); + } + + @Test + public void testHierarchicalQueuesSimilarParents() { + QueueManager queueManager = scheduler.getQueueManager(); + FSLeafQueue leafQueue = queueManager.getLeafQueue("parent.child"); + Assert.assertEquals(2, queueManager.getLeafQueues().size()); + Assert.assertNotNull(leafQueue); + Assert.assertEquals("root.parent.child", leafQueue.getName()); + + FSLeafQueue leafQueue2 = queueManager.getLeafQueue("parent"); + Assert.assertNull(leafQueue2); + Assert.assertEquals(2, queueManager.getLeafQueues().size()); + + FSLeafQueue leafQueue3 = queueManager.getLeafQueue("parent.child.grandchild"); + Assert.assertNull(leafQueue3); + Assert.assertEquals(2, queueManager.getLeafQueues().size()); + + FSLeafQueue leafQueue4 = queueManager.getLeafQueue("parent.sister"); + Assert.assertNotNull(leafQueue4); + Assert.assertEquals("root.parent.sister", leafQueue4.getName()); + Assert.assertEquals(3, queueManager.getLeafQueues().size()); + } @Test public void testSimpleContainerAllocation() { @@ -228,14 +282,14 @@ public void testSimpleContainerAllocation() { // Asked for less than min_allocation. assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, scheduler.getQueueManager().getQueue("queue1"). - getQueueSchedulable().getResourceUsage().getMemory()); + getResourceUsage().getMemory()); NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2, new ArrayList(), new ArrayList()); scheduler.handle(updateEvent2); assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). - getQueueSchedulable().getResourceUsage().getMemory()); + getResourceUsage().getMemory()); } @Test @@ -254,7 +308,7 @@ public void testSimpleContainerReservation() throws InterruptedException { // Make sure queue 1 is allocated app capacity assertEquals(1024, scheduler.getQueueManager().getQueue("queue1"). - getQueueSchedulable().getResourceUsage().getMemory()); + getResourceUsage().getMemory()); // Now queue 2 requests likewise ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1); @@ -263,7 +317,7 @@ public void testSimpleContainerReservation() throws InterruptedException { // Make sure queue 2 is waiting with a reservation assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). - getQueueSchedulable().getResourceUsage().getMemory()); + getResourceUsage().getMemory()); assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); // Now another node checks in with capacity @@ -276,7 +330,7 @@ public void testSimpleContainerReservation() throws InterruptedException { // Make sure this goes to queue 2 assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). - getQueueSchedulable().getResourceUsage().getMemory()); + getResourceUsage().getMemory()); // The old reservation should still be there... assertEquals(1024, scheduler.applications.get(attId).getCurrentReservation().getMemory()); @@ -294,17 +348,22 @@ public void testUserAsDefaultQueue() throws Exception { AppAddedSchedulerEvent appAddedEvent = new AppAddedSchedulerEvent( createAppAttemptId(1, 1), "default", "user1"); scheduler.handle(appAddedEvent); - assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size()); - assertEquals(0, scheduler.getQueueManager().getQueue("default").getApplications().size()); + assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1") + .getAppSchedulables().size()); + assertEquals(0, scheduler.getQueueManager().getLeafQueue("default") + .getAppSchedulables().size()); conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); scheduler.reinitialize(conf, resourceManager.getRMContext()); AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent( createAppAttemptId(2, 1), "default", "user2"); scheduler.handle(appAddedEvent2); - assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size()); - assertEquals(1, scheduler.getQueueManager().getQueue("default").getApplications().size()); - assertEquals(0, scheduler.getQueueManager().getQueue("user2").getApplications().size()); + assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1") + .getAppSchedulables().size()); + assertEquals(1, scheduler.getQueueManager().getLeafQueue("default") + .getAppSchedulables().size()); + assertEquals(0, scheduler.getQueueManager().getLeafQueue("user2") + .getAppSchedulables().size()); } @Test @@ -338,18 +397,17 @@ public void testFairShareWithMinAlloc() throws Exception { scheduler.update(); - Collection queues = scheduler.getQueueManager().getQueues(); + Collection queues = scheduler.getQueueManager().getLeafQueues(); assertEquals(3, queues.size()); - for (FSQueue p : queues) { - if (p.getName().equals("queueA")) { - assertEquals(1024, p.getQueueSchedulable().getFairShare().getMemory()); + for (FSLeafQueue p : queues) { + if (p.getName().equals("root.queueA")) { + assertEquals(1024, p.getFairShare().getMemory()); } - else if (p.getName().equals("queueB")) { - assertEquals(2048, p.getQueueSchedulable().getFairShare().getMemory()); + else if (p.getName().equals("root.queueB")) { + assertEquals(2048, p.getFairShare().getMemory()); } } - } /** @@ -358,11 +416,11 @@ else if (p.getName().equals("queueB")) { @Test public void testQueueDemandCalculation() throws Exception { ApplicationAttemptId id11 = createAppAttemptId(1, 1); - scheduler.addApplication(id11, "queue1", "user1"); + scheduler.addApplication(id11, "root.queue1", "user1"); ApplicationAttemptId id21 = createAppAttemptId(2, 1); - scheduler.addApplication(id21, "queue2", "user1"); + scheduler.addApplication(id21, "root.queue2", "user1"); ApplicationAttemptId id22 = createAppAttemptId(2, 2); - scheduler.addApplication(id22, "queue2", "user1"); + scheduler.addApplication(id22, "root.queue2", "user1"); int minReqSize = YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB; @@ -388,10 +446,10 @@ public void testQueueDemandCalculation() throws Exception { scheduler.update(); - assertEquals(2 * minReqSize, scheduler.getQueueManager().getQueue("queue1") - .getQueueSchedulable().getDemand().getMemory()); + assertEquals(2 * minReqSize, scheduler.getQueueManager().getQueue("root.queue1") + .getDemand().getMemory()); assertEquals(2 * minReqSize + 2 * minReqSize + (2 * minReqSize), scheduler - .getQueueManager().getQueue("queue2").getQueueSchedulable().getDemand() + .getQueueManager().getQueue("root.queue2").getDemand() .getMemory()); } @@ -402,10 +460,11 @@ public void testAppAdditionAndRemoval() throws Exception { scheduler.handle(appAddedEvent1); // Scheduler should have two queues (the default and the one created for user1) - assertEquals(2, scheduler.getQueueManager().getQueues().size()); + assertEquals(2, scheduler.getQueueManager().getLeafQueues().size()); // That queue should have one app - assertEquals(1, scheduler.getQueueManager().getQueue("user1").getApplications().size()); + assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1") + .getAppSchedulables().size()); AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent( createAppAttemptId(1, 1), RMAppAttemptState.FINISHED); @@ -414,7 +473,8 @@ public void testAppAdditionAndRemoval() throws Exception { scheduler.handle(appRemovedEvent1); // Queue should have no apps - assertEquals(0, scheduler.getQueueManager().getQueue("user1").getApplications().size()); + assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1") + .getAppSchedulables().size()); } @Test @@ -466,60 +526,98 @@ public void testAllocationFileParsing() throws Exception { QueueManager queueManager = scheduler.getQueueManager(); queueManager.initialize(); - assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue + assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue assertEquals(Resources.createResource(0), - queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); + queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); assertEquals(Resources.createResource(0), - queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); + queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); assertEquals(Resources.createResource(1024), - queueManager.getMinResources("queueA")); + queueManager.getMinResources("root.queueA")); assertEquals(Resources.createResource(2048), - queueManager.getMinResources("queueB")); + queueManager.getMinResources("root.queueB")); assertEquals(Resources.createResource(0), - queueManager.getMinResources("queueC")); + queueManager.getMinResources("root.queueC")); assertEquals(Resources.createResource(0), - queueManager.getMinResources("queueD")); + queueManager.getMinResources("root.queueD")); assertEquals(Resources.createResource(0), - queueManager.getMinResources("queueE")); + queueManager.getMinResources("root.queueE")); - assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(15, queueManager.getQueueMaxApps("queueA")); - assertEquals(15, queueManager.getQueueMaxApps("queueB")); - assertEquals(15, queueManager.getQueueMaxApps("queueC")); - assertEquals(3, queueManager.getQueueMaxApps("queueD")); - assertEquals(15, queueManager.getQueueMaxApps("queueE")); + assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(15, queueManager.getQueueMaxApps("root.queueA")); + assertEquals(15, queueManager.getQueueMaxApps("root.queueB")); + assertEquals(15, queueManager.getQueueMaxApps("root.queueC")); + assertEquals(3, queueManager.getQueueMaxApps("root.queueD")); + assertEquals(15, queueManager.getQueueMaxApps("root.queueE")); assertEquals(10, queueManager.getUserMaxApps("user1")); assertEquals(5, queueManager.getUserMaxApps("user2")); // Unspecified queues should get default ACL - Map aclsA = queueManager.getQueueAcls("queueA"); + Map aclsA = queueManager.getQueueAcls("root.queueA"); assertTrue(aclsA.containsKey(QueueACL.ADMINISTER_QUEUE)); assertEquals("*", aclsA.get(QueueACL.ADMINISTER_QUEUE).getAclString()); assertTrue(aclsA.containsKey(QueueACL.SUBMIT_APPLICATIONS)); assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); // Queue B ACL - Map aclsB = queueManager.getQueueAcls("queueB"); + Map aclsB = queueManager.getQueueAcls("root.queueB"); assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE)); assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString()); // Queue c ACL - Map aclsC = queueManager.getQueueAcls("queueC"); + Map aclsC = queueManager.getQueueAcls("root.queueC"); assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS)); assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout( + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); - assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE")); assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); } + @Test + public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException, + AllocationConfigurationException, ParserConfigurationException { + Configuration conf = createConfiguration(); + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("2048"); + out.println(""); + out.println(""); + out.println("2048"); + out.println(""); + out.println("2048"); + out.println(""); + out.println(""); + out.println("2048"); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + QueueManager queueManager = scheduler.getQueueManager(); + queueManager.initialize(); + + Collection leafQueues = queueManager.getLeafQueues(); + Assert.assertEquals(4, leafQueues.size()); + Assert.assertNotNull(queueManager.getLeafQueue("queueA")); + Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueC")); + Assert.assertNotNull(queueManager.getLeafQueue("queueB.queueD")); + Assert.assertNotNull(queueManager.getLeafQueue("default")); + // Make sure querying for queues didn't create any new ones: + Assert.assertEquals(4, leafQueues.size()); + } + @Test public void testBackwardsCompatibleAllocationFileParsing() throws Exception { Configuration conf = createConfiguration(); @@ -569,29 +667,29 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { QueueManager queueManager = scheduler.getQueueManager(); queueManager.initialize(); - assertEquals(6, queueManager.getQueues().size()); // 5 in file + default queue + assertEquals(6, queueManager.getLeafQueues().size()); // 5 in file + default queue assertEquals(Resources.createResource(0), - queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); + queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); assertEquals(Resources.createResource(0), - queueManager.getMinResources(YarnConfiguration.DEFAULT_QUEUE_NAME)); + queueManager.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); assertEquals(Resources.createResource(1024), - queueManager.getMinResources("queueA")); + queueManager.getMinResources("root.queueA")); assertEquals(Resources.createResource(2048), - queueManager.getMinResources("queueB")); + queueManager.getMinResources("root.queueB")); assertEquals(Resources.createResource(0), - queueManager.getMinResources("queueC")); + queueManager.getMinResources("root.queueC")); assertEquals(Resources.createResource(0), - queueManager.getMinResources("queueD")); + queueManager.getMinResources("root.queueD")); assertEquals(Resources.createResource(0), - queueManager.getMinResources("queueE")); + queueManager.getMinResources("root.queueE")); - assertEquals(15, queueManager.getQueueMaxApps(YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(15, queueManager.getQueueMaxApps("queueA")); - assertEquals(15, queueManager.getQueueMaxApps("queueB")); - assertEquals(15, queueManager.getQueueMaxApps("queueC")); - assertEquals(3, queueManager.getQueueMaxApps("queueD")); - assertEquals(15, queueManager.getQueueMaxApps("queueE")); + assertEquals(15, queueManager.getQueueMaxApps("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); + assertEquals(15, queueManager.getQueueMaxApps("root.queueA")); + assertEquals(15, queueManager.getQueueMaxApps("root.queueB")); + assertEquals(15, queueManager.getQueueMaxApps("root.queueC")); + assertEquals(3, queueManager.getQueueMaxApps("root.queueD")); + assertEquals(15, queueManager.getQueueMaxApps("root.queueE")); assertEquals(10, queueManager.getUserMaxApps("user1")); assertEquals(5, queueManager.getUserMaxApps("user2")); @@ -603,23 +701,23 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { assertEquals("*", aclsA.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); // Queue B ACL - Map aclsB = queueManager.getQueueAcls("queueB"); + Map aclsB = queueManager.getQueueAcls("root.queueB"); assertTrue(aclsB.containsKey(QueueACL.ADMINISTER_QUEUE)); assertEquals("alice,bob admins", aclsB.get(QueueACL.ADMINISTER_QUEUE).getAclString()); // Queue c ACL - Map aclsC = queueManager.getQueueAcls("queueC"); + Map aclsC = queueManager.getQueueAcls("root.queueC"); assertTrue(aclsC.containsKey(QueueACL.SUBMIT_APPLICATIONS)); assertEquals("alice,bob admins", aclsC.get(QueueACL.SUBMIT_APPLICATIONS).getAclString()); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout( + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueB")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueC")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueD")); - assertEquals(120000, queueManager.getMinSharePreemptionTimeout("queueA")); - assertEquals(60000, queueManager.getMinSharePreemptionTimeout("queueE")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueB")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueC")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueD")); + assertEquals(120000, queueManager.getMinSharePreemptionTimeout("root.queueA")); + assertEquals(60000, queueManager.getMinSharePreemptionTimeout("root.queueE")); assertEquals(300000, queueManager.getFairSharePreemptionTimeout()); } @@ -659,25 +757,25 @@ public void testIsStarvedForMinShare() throws Exception { // Queue B arrives and wants 1 * 1024 createSchedulingRequest(1 * 1024, "queueB", "user1"); scheduler.update(); - Collection queues = scheduler.getQueueManager().getQueues(); + Collection queues = scheduler.getQueueManager().getLeafQueues(); assertEquals(3, queues.size()); // Queue A should be above min share, B below. - for (FSQueue p : queues) { - if (p.getName().equals("queueA")) { - assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); + for (FSLeafQueue p : queues) { + if (p.getName().equals("root.queueA")) { + assertEquals(false, scheduler.isStarvedForMinShare(p)); } - else if (p.getName().equals("queueB")) { - assertEquals(true, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); + else if (p.getName().equals("root.queueB")) { + assertEquals(true, scheduler.isStarvedForMinShare(p)); } } // Node checks in again, should allocate for B scheduler.handle(nodeEvent2); // Now B should have min share ( = demand here) - for (FSQueue p : queues) { - if (p.getName().equals("queueB")) { - assertEquals(false, scheduler.isStarvedForMinShare(p.getQueueSchedulable())); + for (FSLeafQueue p : queues) { + if (p.getName().equals("root.queueB")) { + assertEquals(false, scheduler.isStarvedForMinShare(p)); } } } @@ -718,16 +816,16 @@ public void testIsStarvedForFairShare() throws Exception { // Queue B arrives and wants 1 * 1024 createSchedulingRequest(1 * 1024, "queueB", "user1"); scheduler.update(); - Collection queues = scheduler.getQueueManager().getQueues(); + Collection queues = scheduler.getQueueManager().getLeafQueues(); assertEquals(3, queues.size()); // Queue A should be above fair share, B below. - for (FSQueue p : queues) { - if (p.getName().equals("queueA")) { - assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); + for (FSLeafQueue p : queues) { + if (p.getName().equals("root.queueA")) { + assertEquals(false, scheduler.isStarvedForFairShare(p)); } - else if (p.getName().equals("queueB")) { - assertEquals(true, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); + else if (p.getName().equals("root.queueB")) { + assertEquals(true, scheduler.isStarvedForFairShare(p)); } } @@ -735,9 +833,9 @@ else if (p.getName().equals("queueB")) { scheduler.handle(nodeEvent2); // B should not be starved for fair share, since entire demand is // satisfied. - for (FSQueue p : queues) { - if (p.getName().equals("queueB")) { - assertEquals(false, scheduler.isStarvedForFairShare(p.getQueueSchedulable())); + for (FSLeafQueue p : queues) { + if (p.getName().equals("root.queueB")) { + assertEquals(false, scheduler.isStarvedForFairShare(p)); } } } @@ -845,7 +943,7 @@ public void testChoiceOfPreemptedContainers() throws Exception { // We should be able to claw back one container from A and B each. // Make sure it is lowest priority container. - scheduler.preemptResources(scheduler.getQueueSchedulables(), + scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(1, scheduler.applications.get(app2).getLiveContainers().size()); @@ -856,7 +954,7 @@ public void testChoiceOfPreemptedContainers() throws Exception { // We should be able to claw back another container from A and B each. // Make sure it is lowest priority container. - scheduler.preemptResources(scheduler.getQueueSchedulables(), + scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); @@ -866,7 +964,7 @@ public void testChoiceOfPreemptedContainers() throws Exception { assertEquals(0, scheduler.applications.get(app6).getLiveContainers().size()); // Now A and B are below fair share, so preemption shouldn't do anything - scheduler.preemptResources(scheduler.getQueueSchedulables(), + scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), Resources.createResource(2 * 1024)); assertEquals(1, scheduler.applications.get(app1).getLiveContainers().size()); assertEquals(0, scheduler.applications.get(app2).getLiveContainers().size()); @@ -977,10 +1075,10 @@ public void testPreemptionDecision() throws Exception { scheduler.update(); - FSQueueSchedulable schedC = - scheduler.getQueueManager().getQueue("queueC").getQueueSchedulable(); - FSQueueSchedulable schedD = - scheduler.getQueueManager().getQueue("queueD").getQueueSchedulable(); + FSLeafQueue schedC = + scheduler.getQueueManager().getLeafQueue("queueC"); + FSLeafQueue schedD = + scheduler.getQueueManager().getLeafQueue("queueD"); assertTrue(Resources.equals( Resources.none(), scheduler.resToPreempt(schedC, clock.getTime()))); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index b5da6bbd343..988c42dfe11 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -53,6 +53,22 @@ Hadoop MapReduce Next Generation - Fair Scheduler capacity between the running apps. queues can also be given weights to share the cluster non-proportionally in the config file. + The fair scheduler supports hierarchical queues. All queues descend from a + queue named "root". Available resources are distributed among the children + of the root queue in the typical fair scheduling fashion. Then, the children + distribute the resources assigned to them to their children in the same + fashion. Applications may only be scheduled on leaf queues. Queues can be + specified as children of other queues by placing them as sub-elements of + their parents in the fair scheduler configuration file. + + A queue's name starts with the names of its parents, with periods as + separators. So a queue named "queue1" under the root named, would be + referred to as "root.queue1", and a queue named "queue2" under a queue + named "parent1" would be referred to as "root.parent1.queue2". When + referring to queues, the root part of the name is optional, so queue1 could + be referred to as just "queue1", and a queue2 could be referred to as just + "parent1.queue2". + In addition to providing fair sharing, the Fair Scheduler allows assigning guaranteed minimum shares to queues, which is useful for ensuring that certain users, groups or production applications always get sufficient @@ -163,11 +179,14 @@ Allocation file format - 100000 - 900000 + 10000 + 90000 50 2.0 fair + + 5000 + 30