From 15d53942212ac0fe77e231cf0fe6ae249c612699 Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Thu, 18 Apr 2013 18:13:36 +0000 Subject: [PATCH] YARN-482. FS: Extend SchedulingMode to intermediate queues. (kkambatl via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1469511 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/AppSchedulable.java | 13 +- .../scheduler/fair/FSLeafQueue.java | 63 +++----- .../scheduler/fair/FSParentQueue.java | 42 ++++- .../scheduler/fair/FSQueue.java | 35 ++++- .../scheduler/fair/FSSchedulerNode.java | 8 +- .../scheduler/fair/FairScheduler.java | 43 ++---- .../scheduler/fair/QueueManager.java | 54 ++++--- .../scheduler/fair/Schedulable.java | 6 +- .../scheduler/fair/SchedulingMode.java | 118 -------------- .../scheduler/fair/SchedulingPolicy.java | 145 ++++++++++++++++++ .../FairSharePolicy.java} | 13 +- .../FifoPolicy.java} | 11 +- .../scheduler/fair/FakeSchedulable.java | 2 +- .../scheduler/fair/TestComputeFairShares.java | 6 +- .../scheduler/fair/TestFairScheduler.java | 81 +++++++++- .../scheduler/fair/TestSchedulingMode.java | 59 ------- .../scheduler/fair/TestSchedulingPolicy.java | 109 +++++++++++++ 18 files changed, 509 insertions(+), 302 deletions(-) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/{modes/FairSchedulingMode.java => policies/FairSharePolicy.java} (97%) rename hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/{modes/FifoSchedulingMode.java => policies/FifoPolicy.java} (93%) delete mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 16f231adf1b..475a5316ee0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -23,6 +23,9 @@ Release 2.0.5-beta - UNRELEASED NEW FEATURES + YARN-482. FS: Extend SchedulingMode to intermediate queues. + (kkambatl via tucu) + IMPROVEMENTS YARN-365. Change NM heartbeat handling to not generate a scheduler event diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 32328be5a68..fa01a0bb993 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -278,9 +278,7 @@ public class AppSchedulable extends Schedulable { } } - - @Override - public Resource assignContainer(FSSchedulerNode node, boolean reserved) { + private Resource assignContainer(FSSchedulerNode node, boolean reserved) { LOG.info("Node offered to app: " + getName() + " reserved: " + reserved); if (reserved) { @@ -345,4 +343,13 @@ public class AppSchedulable extends Schedulable { } return Resources.none(); } + + public Resource assignReservedContainer(FSSchedulerNode node) { + return assignContainer(node, true); + } + + @Override + public Resource assignContainer(FSSchedulerNode node) { + return assignContainer(node, false); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 2c8555005bd..ff5344d12a4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -40,9 +40,6 @@ public class FSLeafQueue extends FSQueue { private final List appScheds = new ArrayList(); - - /** Scheduling mode for jobs inside the queue (fair or FIFO) */ - private SchedulingMode schedulingMode; private final FairScheduler scheduler; private final QueueManager queueMgr; @@ -86,13 +83,18 @@ public class FSLeafQueue extends FSQueue { return appScheds; } - public void setSchedulingMode(SchedulingMode mode) { - this.schedulingMode = mode; + @Override + public void setPolicy(SchedulingPolicy policy) + throws AllocationConfigurationException { + if (!SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)) { + throwPolicyDoesnotApplyException(policy); + } + super.policy = policy; } @Override - public void recomputeFairShares() { - schedulingMode.computeShares(getAppSchedulables(), getFairShare()); + public void recomputeShares() { + policy.computeShares(getAppSchedulables(), getFairShare()); } @Override @@ -136,42 +138,27 @@ public class FSLeafQueue extends FSQueue { } @Override - public Resource assignContainer(FSSchedulerNode node, boolean reserved) { - LOG.debug("Node offered to queue: " + getName() + " reserved: " + reserved); - // If this queue is over its limit, reject - if (Resources.greaterThan(getResourceUsage(), - queueMgr.getMaxResources(getName()))) { - return Resources.none(); + public Resource assignContainer(FSSchedulerNode node) { + Resource assigned = Resources.none(); + if (LOG.isDebugEnabled()) { + LOG.debug("Node offered to queue: " + getName()); } - // If this node already has reserved resources for an app, first try to - // finish allocating resources for that app. - if (reserved) { - for (AppSchedulable sched : appScheds) { - if (sched.getApp().getApplicationAttemptId() == - node.getReservedContainer().getApplicationAttemptId()) { - return sched.assignContainer(node, reserved); + if (!assignContainerPreCheck(node)) { + return assigned; + } + + Comparator comparator = policy.getComparator(); + Collections.sort(appScheds, comparator); + for (AppSchedulable sched : appScheds) { + if (sched.getRunnable()) { + assigned = sched.assignContainer(node); + if (Resources.greaterThan(assigned, Resources.none())) { + break; } } - return Resources.none(); // We should never get here - } - - // Otherwise, chose app to schedule based on given policy. - else { - Comparator comparator = schedulingMode.getComparator(); - - Collections.sort(appScheds, comparator); - for (AppSchedulable sched: appScheds) { - if (sched.getRunnable()) { - Resource assignedResource = sched.assignContainer(node, reserved); - if (!assignedResource.equals(Resources.none())) { - return assignedResource; - } - } - } - - return Resources.none(); } + return assigned; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index dec5d888bd2..298ceeed056 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.List; import org.apache.commons.logging.Log; @@ -33,7 +34,6 @@ public class FSParentQueue extends FSQueue { private static final Log LOG = LogFactory.getLog( FSParentQueue.class.getName()); - private final List childQueues = new ArrayList(); private final QueueManager queueMgr; @@ -50,11 +50,11 @@ public class FSParentQueue extends FSQueue { } @Override - public void recomputeFairShares() { - SchedulingMode.getDefault().computeShares(childQueues, getFairShare()); + public void recomputeShares() { + policy.computeShares(childQueues, getFairShare()); for (FSQueue childQueue : childQueues) { childQueue.getMetrics().setAvailableResourcesToQueue(childQueue.getFairShare()); - childQueue.recomputeFairShares(); + childQueue.recomputeShares(); } } @@ -131,13 +131,41 @@ public class FSParentQueue extends FSQueue { } @Override - public Resource assignContainer(FSSchedulerNode node, boolean reserved) { - throw new IllegalStateException( - "Parent queue should not be assigned container"); + public Resource assignContainer(FSSchedulerNode node) { + Resource assigned = Resources.none(); + + // If this queue is over its limit, reject + if (Resources.greaterThan(getResourceUsage(), + queueMgr.getMaxResources(getName()))) { + return assigned; + } + + Collections.sort(childQueues, policy.getComparator()); + for (FSQueue child : childQueues) { + assigned = child.assignContainer(node); + if (node.getReservedContainer() != null + || Resources.greaterThan(assigned, Resources.none())) { + break; + } + } + return assigned; } @Override public Collection getChildQueues() { return childQueues; } + + @Override + public void setPolicy(SchedulingPolicy policy) + throws AllocationConfigurationException { + boolean allowed = + SchedulingPolicy.isApplicableTo(policy, (this == queueMgr + .getRootQueue()) ? SchedulingPolicy.DEPTH_ROOT + : SchedulingPolicy.DEPTH_INTERMEDIATE); + if (!allowed) { + throwPolicyDoesnotApplyException(policy); + } + super.policy = policy; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index dd164fcf979..0a03749d326 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -45,6 +45,8 @@ public abstract class FSQueue extends Schedulable implements Queue { protected final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); + protected SchedulingPolicy policy = SchedulingPolicy.getDefault(); + public FSQueue(String name, QueueManager queueMgr, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -63,6 +65,19 @@ public abstract class FSQueue extends Schedulable implements Queue { return name; } + public SchedulingPolicy getPolicy() { + return policy; + } + + protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy) + throws AllocationConfigurationException { + throw new AllocationConfigurationException("SchedulingPolicy " + policy + + " does not apply to queue " + getName()); + } + + public abstract void setPolicy(SchedulingPolicy policy) + throws AllocationConfigurationException; + @Override public double getWeight() { return queueMgr.getQueueWeight(getName()); @@ -130,13 +145,27 @@ public abstract class FSQueue extends Schedulable implements Queue { } /** - * Recomputes the fair shares for all queues and applications - * under this queue. + * Recomputes the shares for all child queues and applications based on this + * queue's current share */ - public abstract void recomputeFairShares(); + public abstract void recomputeShares(); /** * Gets the children of this queue, if any. */ public abstract Collection getChildQueues(); + + /** + * Helper method to check if the queue should attempt assigning resources + * + * @return true if check passes (can assign) or false otherwise + */ + protected boolean assignContainerPreCheck(FSSchedulerNode node) { + if (Resources.greaterThan(getResourceUsage(), + queueMgr.getMaxResources(getName())) + || node.getReservedContainer() != null) { + return false; + } + return true; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java index efc1c9f6842..7cde2fb6827 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerNode.java @@ -52,6 +52,7 @@ public class FSSchedulerNode extends SchedulerNode { private volatile int numContainers; private RMContainer reservedContainer; + private AppSchedulable reservedAppSchedulable; /* set of containers that are allocated containers */ private final Map launchedContainers = @@ -221,6 +222,7 @@ public class FSSchedulerNode extends SchedulerNode { " on node " + this + " for application " + application); } this.reservedContainer = reservedContainer; + this.reservedAppSchedulable = application.getAppSchedulable(); } public synchronized void unreserveResource( @@ -237,11 +239,15 @@ public class FSSchedulerNode extends SchedulerNode { " on node " + this); } - reservedContainer = null; + this.reservedContainer = null; + this.reservedAppSchedulable = null; } public synchronized RMContainer getReservedContainer() { return reservedContainer; } + public synchronized AppSchedulable getReservedAppSchedulable() { + return reservedAppSchedulable; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 238432e941c..df575860afd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -161,7 +161,7 @@ public class FairScheduler implements ResourceScheduler { protected boolean assignMultiple; // Allocate multiple containers per // heartbeat protected int maxAssign; // Max containers to assign per heartbeat - + public FairScheduler() { clock = new SystemClock(); queueMgr = new QueueManager(this); @@ -217,7 +217,7 @@ public class FairScheduler implements ResourceScheduler { rootQueue.setFairShare(clusterCapacity); // Recursively compute fair shares for all queues // and update metrics - rootQueue.recomputeFairShares(); + rootQueue.recomputeShares(); // Update recorded capacity of root queue (child queues are updated // when fair share is calculated). @@ -786,39 +786,24 @@ public class FairScheduler implements ResourceScheduler { // 1. Check for reserved applications // 2. Schedule if there are no reservations - // If we have have an application that has reserved a resource on this node - // already, we try to complete the reservation. - RMContainer reservedContainer = node.getReservedContainer(); - if (reservedContainer != null) { - FSSchedulerApp reservedApplication = - applications.get(reservedContainer.getApplicationAttemptId()); + AppSchedulable reservedAppSchedulable = node.getReservedAppSchedulable(); + if (reservedAppSchedulable != null) { + // Reservation exists; try to fulfill the reservation + LOG.info("Trying to fulfill reservation for application " + + reservedAppSchedulable.getApp().getApplicationAttemptId() + + " on node: " + nm); - // Try to fulfill the reservation - LOG.info("Trying to fulfill reservation for application " + - reservedApplication.getApplicationId() + " on node: " + nm); - - FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName()); - queue.assignContainer(node, true); + node.getReservedAppSchedulable().assignReservedContainer(node); } - - // Otherwise, schedule at queue which is furthest below fair share else { + // No reservation, schedule at queue which is farthest below fair share int assignedContainers = 0; while (node.getReservedContainer() == null) { - // At most one task is scheduled each iteration of this loop - List scheds = new ArrayList( - queueMgr.getLeafQueues()); - Collections.sort(scheds, SchedulingMode.getDefault().getComparator()); boolean assignedContainer = false; - for (FSLeafQueue sched : scheds) { - Resource assigned = sched.assignContainer(node, false); - if (Resources.greaterThan(assigned, Resources.none()) || - node.getReservedContainer() != null) { - eventLog.log("ASSIGN", nm.getHostName(), assigned); - assignedContainers++; - assignedContainer = true; - break; - } + if (Resources.greaterThan( + queueMgr.getRootQueue().assignContainer(node), + Resources.none())) { + assignedContainer = true; } if (!assignedContainer) { break; } if (!assignMultiple) { break; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index fc76d02c5fb..e4efb937e27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -143,7 +143,6 @@ public class QueueManager { if (leafQueue == null) { return null; } - leafQueue.setSchedulingMode(info.defaultSchedulingMode); queue = leafQueue; } else if (queue instanceof FSParentQueue) { return null; @@ -302,7 +301,7 @@ public class QueueManager { Map queueMaxApps = new HashMap(); Map userMaxApps = new HashMap(); Map queueWeights = new HashMap(); - Map queueModes = new HashMap(); + Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); Map> queueAcls = new HashMap>(); @@ -310,7 +309,7 @@ public class QueueManager { int queueMaxAppsDefault = Integer.MAX_VALUE; long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; - SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault(); + SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault(); // Remember all queue names so we can display them on web UI, etc. List queueNamesInAllocFile = new ArrayList(); @@ -339,7 +338,7 @@ public class QueueManager { if ("queue".equals(element.getTagName()) || "pool".equals(element.getTagName())) { loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps, - userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts, + userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts, queueAcls, queueNamesInAllocFile); } else if ("user".equals(element.getTagName())) { String userName = element.getAttribute("name"); @@ -370,11 +369,12 @@ public class QueueManager { } else if ("queueMaxAppsDefault".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); - queueMaxAppsDefault = val;} - else if ("defaultQueueSchedulingMode".equals(element.getTagName())) { + queueMaxAppsDefault = val; + } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) + || "defaultQueueSchedulingMode".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); - SchedulingMode.setDefault(text); - defaultSchedulingMode = SchedulingMode.getDefault(); + SchedulingPolicy.setDefault(text); + defaultSchedPolicy = SchedulingPolicy.getDefault(); } else { LOG.warn("Bad element in allocations file: " + element.getTagName()); } @@ -385,7 +385,7 @@ public class QueueManager { synchronized (this) { info = new QueueManagerInfo(minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault, - queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts, + queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts, queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout); // Root queue should have empty ACLs. As a queue's ACL is the union of @@ -396,14 +396,15 @@ public class QueueManager { rootAcls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(" ")); rootAcls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(" ")); queueAcls.put(ROOT_QUEUE, rootAcls); - + + // Create all queus for (String name: queueNamesInAllocFile) { - FSLeafQueue queue = getLeafQueue(name); - if (queueModes.containsKey(name)) { - queue.setSchedulingMode(queueModes.get(name)); - } else { - queue.setSchedulingMode(defaultSchedulingMode); - } + getLeafQueue(name); + } + + // Set custom policies as specified + for (Map.Entry entry : queuePolicies.entrySet()) { + queues.get(entry.getKey()).setPolicy(entry.getValue()); } } } @@ -414,7 +415,8 @@ public class QueueManager { private void loadQueue(String parentName, Element element, Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, Map queueWeights, - Map queueModes, Map minSharePreemptionTimeouts, + Map queuePolicies, + Map minSharePreemptionTimeouts, Map> queueAcls, List queueNamesInAllocFile) throws AllocationConfigurationException { String queueName = parentName + "." + element.getAttribute("name"); @@ -448,9 +450,10 @@ public class QueueManager { String text = ((Text)field.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; minSharePreemptionTimeouts.put(queueName, val); - } else if ("schedulingMode".equals(field.getTagName())) { + } else if ("schedulingPolicy".equals(field.getTagName()) + || "schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); - queueModes.put(queueName, SchedulingMode.parse(text)); + queuePolicies.put(queueName, SchedulingPolicy.parse(text)); } else if ("aclSubmitApps".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text)); @@ -459,8 +462,9 @@ public class QueueManager { acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text)); } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { - loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, - userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts, + loadQueue(queueName, field, minQueueResources, maxQueueResources, + queueMaxApps, userMaxApps, queueWeights, queuePolicies, + minSharePreemptionTimeouts, queueAcls, queueNamesInAllocFile); isLeaf = false; } @@ -615,13 +619,13 @@ public class QueueManager { // below half its fair share for this long, it is allowed to preempt tasks. public final long fairSharePreemptionTimeout; - public final SchedulingMode defaultSchedulingMode; + public final SchedulingPolicy defaultSchedulingPolicy; public QueueManagerInfo(Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, Map queueWeights, int userMaxAppsDefault, - int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode, + int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy, Map minSharePreemptionTimeouts, Map> queueAcls, long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) { @@ -632,7 +636,7 @@ public class QueueManager { this.queueWeights = queueWeights; this.userMaxAppsDefault = userMaxAppsDefault; this.queueMaxAppsDefault = queueMaxAppsDefault; - this.defaultSchedulingMode = defaultSchedulingMode; + this.defaultSchedulingPolicy = defaultSchedulingPolicy; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; this.queueAcls = queueAcls; this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; @@ -651,7 +655,7 @@ public class QueueManager { minSharePreemptionTimeouts = new HashMap(); defaultMinSharePreemptionTimeout = Long.MAX_VALUE; fairSharePreemptionTimeout = Long.MAX_VALUE; - defaultSchedulingMode = SchedulingMode.getDefault(); + defaultSchedulingPolicy = SchedulingPolicy.getDefault(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index 1dc0630dd7a..c68e9d10bec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -93,11 +93,9 @@ public abstract class Schedulable { /** * Assign a container on this node if possible, and return the amount of - * resources assigned. If {@code reserved} is true, it means a reservation - * already exists on this node, and the schedulable should fulfill that - * reservation if possible. + * resources assigned. */ - public abstract Resource assignContainer(FSSchedulerNode node, boolean reserved); + public abstract Resource assignContainer(FSSchedulerNode node); /** Assign a fair share to this Schedulable. */ public void setFairShare(Resource fairShare) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java deleted file mode 100644 index 96948524a12..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java +++ /dev/null @@ -1,118 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; - -import java.util.Collection; -import java.util.Comparator; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.hadoop.classification.InterfaceAudience.Public; -import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.util.ReflectionUtils; -import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode; - -@Public -@Unstable -public abstract class SchedulingMode { - private static final ConcurrentHashMap, SchedulingMode> instances = - new ConcurrentHashMap, SchedulingMode>(); - - private static SchedulingMode DEFAULT_MODE = - getInstance(FairSchedulingMode.class); - - public static SchedulingMode getDefault() { - return DEFAULT_MODE; - } - - public static void setDefault(String className) - throws AllocationConfigurationException { - DEFAULT_MODE = parse(className); - } - - /** - * Returns a {@link SchedulingMode} instance corresponding to the passed clazz - */ - public static SchedulingMode getInstance(Class clazz) { - SchedulingMode mode = instances.get(clazz); - if (mode == null) { - mode = ReflectionUtils.newInstance(clazz, null); - instances.put(clazz, mode); - } - return mode; - } - - /** - * Returns {@link SchedulingMode} instance corresponding to the - * {@link SchedulingMode} passed as a string. The mode can be "fair" for - * FairSchedulingMode of "fifo" for FifoSchedulingMode. For custom - * {@link SchedulingMode}s in the RM classpath, the mode should be canonical - * class name of the {@link SchedulingMode}. - * - * @param mode canonical class name or "fair" or "fifo" - * @throws AllocationConfigurationException - */ - @SuppressWarnings("unchecked") - public static SchedulingMode parse(String mode) - throws AllocationConfigurationException { - @SuppressWarnings("rawtypes") - Class clazz; - String text = mode.toLowerCase(); - if (text.equals("fair")) { - clazz = FairSchedulingMode.class; - } else if (text.equals("fifo")) { - clazz = FifoSchedulingMode.class; - } else { - try { - clazz = Class.forName(mode); - } catch (ClassNotFoundException cnfe) { - throw new AllocationConfigurationException(mode - + " SchedulingMode class not found!"); - } - } - if (!SchedulingMode.class.isAssignableFrom(clazz)) { - throw new AllocationConfigurationException(mode - + " does not extend SchedulingMode"); - } - return getInstance(clazz); - } - - /** - * @return returns the name of SchedulingMode - */ - public abstract String getName(); - - /** - * The comparator returned by this method is to be used for sorting the - * {@link Schedulable}s in that queue. - * - * @return the comparator to sort by - */ - public abstract Comparator getComparator(); - - /** - * Computes and updates the shares of {@link Schedulable}s as per the - * SchedulingMode, to be used later at schedule time. - * - * @param schedulables {@link Schedulable}s whose shares are to be updated - * @param totalResources Total {@link Resource}s in the cluster - */ - public abstract void computeShares( - Collection schedulables, Resource totalResources); -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java new file mode 100644 index 00000000000..8e08a183ed1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.util.Collection; +import java.util.Comparator; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; + +@Public +@Unstable +public abstract class SchedulingPolicy { + private static final ConcurrentHashMap, SchedulingPolicy> instances = + new ConcurrentHashMap, SchedulingPolicy>(); + + private static SchedulingPolicy DEFAULT_POLICY = + getInstance(FairSharePolicy.class); + + public static final byte DEPTH_LEAF = (byte) 1; + public static final byte DEPTH_INTERMEDIATE = (byte) 2; + public static final byte DEPTH_ROOT = (byte) 4; + public static final byte DEPTH_PARENT = (byte) 6; // Root and Intermediate + public static final byte DEPTH_ANY = (byte) 7; + + public static SchedulingPolicy getDefault() { + return DEFAULT_POLICY; + } + + public static void setDefault(String className) + throws AllocationConfigurationException { + DEFAULT_POLICY = parse(className); + } + + /** + * Returns a {@link SchedulingPolicy} instance corresponding to the passed clazz + */ + public static SchedulingPolicy getInstance(Class clazz) { + SchedulingPolicy policy = instances.get(clazz); + if (policy == null) { + policy = ReflectionUtils.newInstance(clazz, null); + instances.put(clazz, policy); + } + return policy; + } + + /** + * Returns {@link SchedulingPolicy} instance corresponding to the + * {@link SchedulingPolicy} passed as a string. The policy can be "fair" for + * FairsharePolicy or "fifo" for FifoPolicy. For custom + * {@link SchedulingPolicy}s in the RM classpath, the policy should be + * canonical class name of the {@link SchedulingPolicy}. + * + * @param policy canonical class name or "fair" or "fifo" + * @throws AllocationConfigurationException + */ + @SuppressWarnings("unchecked") + public static SchedulingPolicy parse(String policy) + throws AllocationConfigurationException { + @SuppressWarnings("rawtypes") + Class clazz; + String text = policy.toLowerCase(); + if (text.equals("fair")) { + clazz = FairSharePolicy.class; + } else if (text.equals("fifo")) { + clazz = FifoPolicy.class; + } else { + try { + clazz = Class.forName(policy); + } catch (ClassNotFoundException cnfe) { + throw new AllocationConfigurationException(policy + + " SchedulingPolicy class not found!"); + } + } + if (!SchedulingPolicy.class.isAssignableFrom(clazz)) { + throw new AllocationConfigurationException(policy + + " does not extend SchedulingPolicy"); + } + return getInstance(clazz); + } + + /** + * @return returns the name of {@link SchedulingPolicy} + */ + public abstract String getName(); + + /** + * Specifies the depths in the hierarchy, this {@link SchedulingPolicy} + * applies to + * + * @return depth equal to one of fields {@link SchedulingPolicy}#DEPTH_* + */ + public abstract byte getApplicableDepth(); + + /** + * Checks if the specified {@link SchedulingPolicy} can be used for a queue at + * the specified depth in the hierarchy + * + * @param policy {@link SchedulingPolicy} we are checking the + * depth-applicability for + * @param depth queue's depth in the hierarchy + * @return true if policy is applicable to passed depth, false otherwise + */ + public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) { + return ((policy.getApplicableDepth() & depth) == depth) ? true : false; + } + + /** + * The comparator returned by this method is to be used for sorting the + * {@link Schedulable}s in that queue. + * + * @return the comparator to sort by + */ + public abstract Comparator getComparator(); + + /** + * Computes and updates the shares of {@link Schedulable}s as per the + * {@link SchedulingPolicy}, to be used later at schedule time. + * + * @param schedulables {@link Schedulable}s whose shares are to be updated + * @param totalResources Total {@link Resource}s in the cluster + */ + public abstract void computeShares( + Collection schedulables, Resource totalResources); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java similarity index 97% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 56a37f48899..4e229315cc3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FairSchedulingMode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes; +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; import java.io.Serializable; import java.util.Collection; @@ -24,13 +24,13 @@ import java.util.Comparator; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import com.google.common.annotations.VisibleForTesting; -public class FairSchedulingMode extends SchedulingMode { +public class FairSharePolicy extends SchedulingPolicy { @VisibleForTesting - public static final String NAME = "FairShare"; + public static final String NAME = "Fairshare"; private FairShareComparator comparator = new FairShareComparator(); @Override @@ -211,4 +211,9 @@ public class FairSchedulingMode extends SchedulingMode { share = Math.min(share, sched.getDemand().getMemory()); return Resources.createResource((int) share); } + + @Override + public byte getApplicableDepth() { + return SchedulingPolicy.DEPTH_ANY; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java similarity index 93% rename from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java rename to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index f3c5e368dd6..25766ea6222 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/modes/FifoSchedulingMode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes; +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; import java.io.Serializable; import java.util.Collection; @@ -24,11 +24,11 @@ import java.util.Comparator; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import com.google.common.annotations.VisibleForTesting; -public class FifoSchedulingMode extends SchedulingMode { +public class FifoPolicy extends SchedulingPolicy { @VisibleForTesting public static final String NAME = "FIFO"; private FifoComparator comparator = new FifoComparator(); @@ -73,4 +73,9 @@ public class FifoSchedulingMode extends SchedulingMode { sched.setFairShare(Resources.createResource(0)); } } + + @Override + public byte getApplicableDepth() { + return SchedulingPolicy.DEPTH_LEAF; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index e75b62dd890..b910cb935ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -68,7 +68,7 @@ public class FakeSchedulable extends Schedulable { } @Override - public Resource assignContainer(FSSchedulerNode node, boolean reserved) { + public Resource assignContainer(FSSchedulerNode node) { return null; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java index 3a36fc15eaa..2a9dba5727d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java @@ -24,7 +24,7 @@ import java.util.List; import junit.framework.Assert; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.junit.Before; import org.junit.Test; @@ -33,12 +33,12 @@ import org.junit.Test; */ public class TestComputeFairShares { private List scheds; - private SchedulingMode schedulingMode; + private SchedulingPolicy schedulingMode; @Before public void setUp() throws Exception { scheds = new ArrayList(); - schedulingMode = new FairSchedulingMode(); + schedulingMode = new FairSharePolicy(); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 8bb4f3ff933..66bf6c99b8f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -73,7 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.util.BuilderUtils; import org.junit.After; import org.junit.Before; @@ -284,7 +284,7 @@ public class TestFairScheduler { assertEquals(capacity / 4, queue2.getFairShare().getMemory()); assertEquals(capacity / 4, queue3.getFairShare().getMemory()); } - + @Test public void testHierarchicalQueuesSimilarParents() { QueueManager queueManager = scheduler.getQueueManager(); @@ -1359,7 +1359,7 @@ public class TestFairScheduler { FSSchedulerApp app2 = scheduler.applications.get(attId2); FSLeafQueue queue1 = scheduler.getQueueManager().getLeafQueue("queue1"); - queue1.setSchedulingMode(new FifoSchedulingMode()); + queue1.setPolicy(new FifoPolicy()); scheduler.update(); @@ -1381,7 +1381,80 @@ public class TestFairScheduler { assertEquals(2, app1.getLiveContainers().size()); assertEquals(1, app2.getLiveContainers().size()); } - + + /** + * Test to verify the behavior of + * {@link FSQueue#assignContainer(FSSchedulerNode)}) + * + * Create two queues under root (fifoQueue and fairParent), and two queues + * under fairParent (fairChild1 and fairChild2). Submit two apps to the + * fifoQueue and one each to the fairChild* queues, all apps requiring 4 + * containers each of the total 16 container capacity + * + * Assert the number of containers for each app after 4, 8, 12 and 16 updates. + * + * @throws Exception + */ + @Test(timeout = 5000) + public void testAssignContainer() throws Exception { + final String user = "user1"; + final String fifoQueue = "fifo"; + final String fairParent = "fairParent"; + final String fairChild1 = fairParent + ".fairChild1"; + final String fairChild2 = fairParent + ".fairChild2"; + + RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(8192)); + RMNode node2 = MockNodes.newNodeInfo(1, Resources.createResource(8192)); + + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + + scheduler.handle(nodeEvent1); + scheduler.handle(nodeEvent2); + + ApplicationAttemptId attId1 = + createSchedulingRequest(1024, fifoQueue, user, 4); + ApplicationAttemptId attId2 = + createSchedulingRequest(1024, fairChild1, user, 4); + ApplicationAttemptId attId3 = + createSchedulingRequest(1024, fairChild2, user, 4); + ApplicationAttemptId attId4 = + createSchedulingRequest(1024, fifoQueue, user, 4); + + FSSchedulerApp app1 = scheduler.applications.get(attId1); + FSSchedulerApp app2 = scheduler.applications.get(attId2); + FSSchedulerApp app3 = scheduler.applications.get(attId3); + FSSchedulerApp app4 = scheduler.applications.get(attId4); + + scheduler.getQueueManager().getLeafQueue(fifoQueue) + .setPolicy(SchedulingPolicy.parse("fifo")); + scheduler.update(); + + NodeUpdateSchedulerEvent updateEvent1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent updateEvent2 = new NodeUpdateSchedulerEvent(node2); + + for (int i = 0; i < 8; i++) { + scheduler.handle(updateEvent1); + scheduler.handle(updateEvent2); + if ((i + 1) % 2 == 0) { + // 4 node updates: fifoQueue should have received 2, and fairChild* + // should have received one each + String ERR = + "Wrong number of assigned containers after " + (i + 1) + " updates"; + if (i < 4) { + // app1 req still not met + assertEquals(ERR, (i + 1), app1.getLiveContainers().size()); + assertEquals(ERR, 0, app4.getLiveContainers().size()); + } else { + // app1 req has been met, app4 should be served now + assertEquals(ERR, 4, app1.getLiveContainers().size()); + assertEquals(ERR, (i - 3), app4.getLiveContainers().size()); + } + assertEquals(ERR, (i + 1) / 2, app2.getLiveContainers().size()); + assertEquals(ERR, (i + 1) / 2, app3.getLiveContainers().size()); + } + } + } @SuppressWarnings("unchecked") @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java deleted file mode 100644 index 71d43a317d7..00000000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingMode.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; - -import static org.junit.Assert.assertTrue; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FairSchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.modes.FifoSchedulingMode; -import org.junit.Test; - -public class TestSchedulingMode { - - @Test(timeout = 1000) - public void testParseSchedulingMode() throws AllocationConfigurationException { - - // Class name - SchedulingMode sm = SchedulingMode - .parse(FairSchedulingMode.class.getName()); - assertTrue("Invalid scheduler name", - sm.getName().equals(FairSchedulingMode.NAME)); - - // Canonical name - sm = SchedulingMode.parse(FairSchedulingMode.class - .getCanonicalName()); - assertTrue("Invalid scheduler name", - sm.getName().equals(FairSchedulingMode.NAME)); - - // Class - sm = SchedulingMode.getInstance(FairSchedulingMode.class); - assertTrue("Invalid scheduler name", - sm.getName().equals(FairSchedulingMode.NAME)); - - // Shortname - fair - sm = SchedulingMode.parse("fair"); - assertTrue("Invalid scheduler name", - sm.getName().equals(FairSchedulingMode.NAME)); - - // Shortname - fifo - sm = SchedulingMode.parse("fifo"); - assertTrue("Invalid scheduler name", - sm.getName().equals(FifoSchedulingMode.NAME)); - } -} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java new file mode 100644 index 00000000000..e498e7eea98 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestSchedulingPolicy { + + @Test(timeout = 1000) + public void testParseSchedulingPolicy() + throws AllocationConfigurationException { + + // Class name + SchedulingPolicy sm = SchedulingPolicy + .parse(FairSharePolicy.class.getName()); + assertTrue("Invalid scheduler name", + sm.getName().equals(FairSharePolicy.NAME)); + + // Canonical name + sm = SchedulingPolicy.parse(FairSharePolicy.class + .getCanonicalName()); + assertTrue("Invalid scheduler name", + sm.getName().equals(FairSharePolicy.NAME)); + + // Class + sm = SchedulingPolicy.getInstance(FairSharePolicy.class); + assertTrue("Invalid scheduler name", + sm.getName().equals(FairSharePolicy.NAME)); + + // Shortname - fair + sm = SchedulingPolicy.parse("fair"); + assertTrue("Invalid scheduler name", + sm.getName().equals(FairSharePolicy.NAME)); + + // Shortname - fifo + sm = SchedulingPolicy.parse("fifo"); + assertTrue("Invalid scheduler name", + sm.getName().equals(FifoPolicy.NAME)); + } + + /** + * Trivial tests that make sure + * {@link SchedulingPolicy#isApplicableTo(SchedulingPolicy, byte)} works as + * expected for the possible values of depth + * + * @throws AllocationConfigurationException + */ + @Test(timeout = 1000) + public void testIsApplicableTo() throws AllocationConfigurationException { + final String ERR = "Broken SchedulingPolicy#isApplicableTo"; + + // fifo + SchedulingPolicy policy = SchedulingPolicy.parse("fifo"); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)); + assertFalse(ERR, SchedulingPolicy.isApplicableTo( + SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_INTERMEDIATE)); + assertFalse(ERR, SchedulingPolicy.isApplicableTo( + SchedulingPolicy.parse("fifo"), SchedulingPolicy.DEPTH_ROOT)); + + + // fair + policy = SchedulingPolicy.parse("fair"); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_LEAF)); + assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy, + SchedulingPolicy.DEPTH_INTERMEDIATE)); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT)); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT)); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY)); + + policy = Mockito.mock(SchedulingPolicy.class); + Mockito.when(policy.getApplicableDepth()).thenReturn( + SchedulingPolicy.DEPTH_PARENT); + assertTrue(ERR, SchedulingPolicy.isApplicableTo(policy, + SchedulingPolicy.DEPTH_INTERMEDIATE)); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ROOT)); + assertTrue(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_PARENT)); + assertFalse(ERR, + SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY)); + } +}