diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 52d6e4004db..2f80ea3c431 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -41,6 +41,9 @@ Release 2.6.0 - UNRELEASED YARN-2395. FairScheduler: Preemption timeout should be configurable per queue. (Wei Yan via kasha) + YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue. + (Wei Yan via kasha) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 228a761852a..de5a999c2dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -70,6 +70,12 @@ public class AllocationConfiguration { // allowed to preempt other jobs' tasks. private final Map fairSharePreemptionTimeouts; + // The fair share preemption threshold for each queue. If a queue waits + // fairSharePreemptionTimeout without receiving + // fairshare * fairSharePreemptionThreshold resources, it is allowed to + // preempt other queues' tasks. + private final Map fairSharePreemptionThresholds; + private final Map schedulingPolicies; private final SchedulingPolicy defaultSchedulingPolicy; @@ -92,6 +98,7 @@ public class AllocationConfiguration { SchedulingPolicy defaultSchedulingPolicy, Map minSharePreemptionTimeouts, Map fairSharePreemptionTimeouts, + Map fairSharePreemptionThresholds, Map> queueAcls, QueuePlacementPolicy placementPolicy, Map> configuredQueues) { @@ -108,6 +115,7 @@ public class AllocationConfiguration { this.schedulingPolicies = schedulingPolicies; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts; + this.fairSharePreemptionThresholds = fairSharePreemptionThresholds; this.queueAcls = queueAcls; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; @@ -126,6 +134,7 @@ public class AllocationConfiguration { queueAcls = new HashMap>(); minSharePreemptionTimeouts = new HashMap(); fairSharePreemptionTimeouts = new HashMap(); + fairSharePreemptionThresholds = new HashMap(); schedulingPolicies = new HashMap(); defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; configuredQueues = new HashMap>(); @@ -171,7 +180,18 @@ public class AllocationConfiguration { return (fairSharePreemptionTimeout == null) ? -1 : fairSharePreemptionTimeout; } - + + /** + * Get a queue's fair share preemption threshold in the allocation file. + * Return -1f if not set. + */ + public float getFairSharePreemptionThreshold(String queueName) { + Float fairSharePreemptionThreshold = + fairSharePreemptionThresholds.get(queueName); + return (fairSharePreemptionThreshold == null) ? + -1f : fairSharePreemptionThreshold; + } + public ResourceWeights getQueueWeight(String queue) { ResourceWeights weight = queueWeights.get(queue); return (weight == null) ? ResourceWeights.NEUTRAL : weight; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 970ee9956de..c2dfc84a536 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -218,6 +218,8 @@ public class AllocationFileLoaderService extends AbstractService { Map queuePolicies = new HashMap(); Map minSharePreemptionTimeouts = new HashMap(); Map fairSharePreemptionTimeouts = new HashMap(); + Map fairSharePreemptionThresholds = + new HashMap(); Map> queueAcls = new HashMap>(); int userMaxAppsDefault = Integer.MAX_VALUE; @@ -225,6 +227,7 @@ public class AllocationFileLoaderService extends AbstractService { float queueMaxAMShareDefault = -1.0f; long defaultFairSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; + float defaultFairSharePreemptionThreshold = 0.5f; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; QueuePlacementPolicy newPlacementPolicy = null; @@ -277,7 +280,8 @@ public class AllocationFileLoaderService extends AbstractService { String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); userMaxAppsDefault = val; - } else if ("defaultFairSharePreemptionTimeout".equals(element.getTagName())) { + } else if ("defaultFairSharePreemptionTimeout" + .equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; defaultFairSharePreemptionTimeout = val; @@ -287,10 +291,17 @@ public class AllocationFileLoaderService extends AbstractService { long val = Long.parseLong(text) * 1000L; defaultFairSharePreemptionTimeout = val; } - } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) { + } else if ("defaultMinSharePreemptionTimeout" + .equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; defaultMinSharePreemptionTimeout = val; + } else if ("defaultFairSharePreemptionThreshold" + .equals(element.getTagName())) { + String text = ((Text)element.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + defaultFairSharePreemptionThreshold = val; } else if ("queueMaxAppsDefault".equals(element.getTagName())) { String text = ((Text)element.getFirstChild()).getData().trim(); int val = Integer.parseInt(text); @@ -326,7 +337,7 @@ public class AllocationFileLoaderService extends AbstractService { loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, - queueAcls, configuredQueues); + fairSharePreemptionThresholds, queueAcls, configuredQueues); } // Load placement policy and pass it configured queues @@ -349,11 +360,18 @@ public class AllocationFileLoaderService extends AbstractService { defaultFairSharePreemptionTimeout); } + // Set the fair share preemption threshold for the root queue + if (!fairSharePreemptionThresholds.containsKey(QueueManager.ROOT_QUEUE)) { + fairSharePreemptionThresholds.put(QueueManager.ROOT_QUEUE, + defaultFairSharePreemptionThreshold); + } + AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, - minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queueAcls, + minSharePreemptionTimeouts, fairSharePreemptionTimeouts, + fairSharePreemptionThresholds, queueAcls, newPlacementPolicy, configuredQueues); lastSuccessfulReload = clock.getTime(); @@ -365,13 +383,15 @@ public class AllocationFileLoaderService extends AbstractService { /** * Loads a queue from a queue element in the configuration file */ - private void loadQueue(String parentName, Element element, Map minQueueResources, + private void loadQueue(String parentName, Element element, + Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, Map queueMaxAMShares, Map queueWeights, Map queuePolicies, Map minSharePreemptionTimeouts, Map fairSharePreemptionTimeouts, + Map fairSharePreemptionThresholds, Map> queueAcls, Map> configuredQueues) throws AllocationConfigurationException { @@ -418,6 +438,11 @@ public class AllocationFileLoaderService extends AbstractService { String text = ((Text)field.getFirstChild()).getData().trim(); long val = Long.parseLong(text) * 1000L; fairSharePreemptionTimeouts.put(queueName, val); + } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + fairSharePreemptionThresholds.put(queueName, val); } else if ("schedulingPolicy".equals(field.getTagName()) || "schedulingMode".equals(field.getTagName())) { String text = ((Text)field.getFirstChild()).getData().trim(); @@ -434,7 +459,8 @@ public class AllocationFileLoaderService extends AbstractService { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, - fairSharePreemptionTimeouts, queueAcls, configuredQueues); + fairSharePreemptionTimeouts, fairSharePreemptionThresholds, + queueAcls, configuredQueues); configuredQueues.get(FSQueueType.PARENT).add(queueName); isLeaf = false; } @@ -449,11 +475,15 @@ public class AllocationFileLoaderService extends AbstractService { } } queueAcls.put(queueName, acls); - if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName) + if (maxQueueResources.containsKey(queueName) && + minQueueResources.containsKey(queueName) && !Resources.fitsIn(minQueueResources.get(queueName), maxQueueResources.get(queueName))) { - LOG.warn(String.format("Queue %s has max resources %s less than min resources %s", - queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName))); + LOG.warn( + String.format( + "Queue %s has max resources %s less than min resources %s", + queueName, maxQueueResources.get(queueName), + minQueueResources.get(queueName))); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 49e8ef06122..345ea8b7c36 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -54,7 +55,7 @@ public class FSLeafQueue extends FSQueue { // Variables used for preemption private long lastTimeAtMinShare; - private long lastTimeAtHalfFairShare; + private long lastTimeAtFairShareThreshold; // Track the AM resource usage for this queue private Resource amResourceUsage; @@ -65,7 +66,7 @@ public class FSLeafQueue extends FSQueue { FSParentQueue parent) { super(name, scheduler, parent); this.lastTimeAtMinShare = scheduler.getClock().getTime(); - this.lastTimeAtHalfFairShare = scheduler.getClock().getTime(); + this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime(); activeUsersManager = new ActiveUsersManager(getMetrics()); amResourceUsage = Resource.newInstance(0, 0); } @@ -275,16 +276,17 @@ public class FSLeafQueue extends FSQueue { return lastTimeAtMinShare; } - public void setLastTimeAtMinShare(long lastTimeAtMinShare) { + private void setLastTimeAtMinShare(long lastTimeAtMinShare) { this.lastTimeAtMinShare = lastTimeAtMinShare; } - public long getLastTimeAtHalfFairShare() { - return lastTimeAtHalfFairShare; + public long getLastTimeAtFairShareThreshold() { + return lastTimeAtFairShareThreshold; } - public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) { - this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare; + private void setLastTimeAtFairShareThreshold( + long lastTimeAtFairShareThreshold) { + this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold; } @Override @@ -328,6 +330,20 @@ public class FSLeafQueue extends FSQueue { // TODO Auto-generated method stub } + /** + * Update the preemption fields for the queue, i.e. the times since last was + * at its guaranteed share and over its fair share threshold. + */ + public void updateStarvationStats() { + long now = scheduler.getClock().getTime(); + if (!isStarvedForMinShare()) { + setLastTimeAtMinShare(now); + } + if (!isStarvedForFairShare()) { + setLastTimeAtFairShareThreshold(now); + } + } + /** * Helper method to check if the queue should preempt containers * @@ -337,4 +353,28 @@ public class FSLeafQueue extends FSQueue { return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(), getFairShare()); } + + /** + * Is a queue being starved for its min share. + */ + @VisibleForTesting + boolean isStarvedForMinShare() { + return isStarved(getMinShare()); + } + + /** + * Is a queue being starved for its fair share threshold. + */ + @VisibleForTesting + boolean isStarvedForFairShare() { + return isStarved( + Resources.multiply(getFairShare(), getFairSharePreemptionThreshold())); + } + + private boolean isStarved(Resource share) { + Resource desiredShare = Resources.min(FairScheduler.getResourceCalculator(), + scheduler.getClusterResource(), share, getDemand()); + return Resources.lessThan(FairScheduler.getResourceCalculator(), + scheduler.getClusterResource(), getResourceUsage(), desiredShare); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 1209970eccf..f74106a7da9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -78,11 +78,11 @@ public class FSParentQueue extends FSQueue { } @Override - public void updatePreemptionTimeouts() { - super.updatePreemptionTimeouts(); + public void updatePreemptionVariables() { + super.updatePreemptionVariables(); // For child queues for (FSQueue childQueue : childQueues) { - childQueue.updatePreemptionTimeouts(); + childQueue.updatePreemptionVariables(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index b9fcc4bbd97..d4e043d8850 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -54,6 +54,7 @@ public abstract class FSQueue implements Queue, Schedulable { private long fairSharePreemptionTimeout = Long.MAX_VALUE; private long minSharePreemptionTimeout = Long.MAX_VALUE; + private float fairSharePreemptionThreshold = 0.5f; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -186,6 +187,14 @@ public abstract class FSQueue implements Queue, Schedulable { this.minSharePreemptionTimeout = minSharePreemptionTimeout; } + public float getFairSharePreemptionThreshold() { + return fairSharePreemptionThreshold; + } + + public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) { + this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; + } + /** * Recomputes the shares for all child queues and applications based on this * queue's current share @@ -193,21 +202,27 @@ public abstract class FSQueue implements Queue, Schedulable { public abstract void recomputeShares(); /** - * Update the min/fair share preemption timeouts for this queue. + * Update the min/fair share preemption timeouts and threshold for this queue. */ - public void updatePreemptionTimeouts() { - // For min share + public void updatePreemptionVariables() { + // For min share timeout minSharePreemptionTimeout = scheduler.getAllocationConfiguration() .getMinSharePreemptionTimeout(getName()); if (minSharePreemptionTimeout == -1 && parent != null) { minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout(); } - // For fair share + // For fair share timeout fairSharePreemptionTimeout = scheduler.getAllocationConfiguration() .getFairSharePreemptionTimeout(getName()); if (fairSharePreemptionTimeout == -1 && parent != null) { fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout(); } + // For fair share preemption threshold + fairSharePreemptionThreshold = scheduler.getAllocationConfiguration() + .getFairSharePreemptionThreshold(getName()); + if (fairSharePreemptionThreshold < 0 && parent != null) { + fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold(); + } } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2798b8d5f3e..a35e49f282e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -299,7 +299,7 @@ public class FairScheduler extends */ protected synchronized void update() { long start = getClock().getTime(); - updatePreemptionVariables(); // Determine if any queues merit preemption + updateStarvationStats(); // Determine if any queues merit preemption FSQueue rootQueue = queueMgr.getRootQueue(); @@ -329,48 +329,20 @@ public class FairScheduler extends /** * Update the preemption fields for all QueueScheduables, i.e. the times since - * each queue last was at its guaranteed share and at > 1/2 of its fair share - * for each type of task. + * each queue last was at its guaranteed share and over its fair share + * threshold for each type of task. */ - private void updatePreemptionVariables() { - long now = getClock().getTime(); - lastPreemptionUpdateTime = now; + private void updateStarvationStats() { + lastPreemptionUpdateTime = clock.getTime(); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - if (!isStarvedForMinShare(sched)) { - sched.setLastTimeAtMinShare(now); - } - if (!isStarvedForFairShare(sched)) { - sched.setLastTimeAtHalfFairShare(now); - } + sched.updateStarvationStats(); } } - /** - * Is a queue below its min share for the given task type? - */ - boolean isStarvedForMinShare(FSLeafQueue sched) { - Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource, - sched.getMinShare(), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, - sched.getResourceUsage(), desiredShare); - } - - /** - * Is a queue being starved for fair share for the given task type? This is - * defined as being below half its fair share. - */ - boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, - clusterResource, - Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); - return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource, - sched.getResourceUsage(), desiredFairShare); - } - /** * Check for queues that need tasks preempted, either because they have been * below their guaranteed share for minSharePreemptionTimeout or they have - * been below half their fair share for the fairSharePreemptionTimeout. If + * been below their fair share threshold for the fairSharePreemptionTimeout. If * such queues exist, compute how many tasks of each type need to be preempted * and then select the right ones using preemptTasks. */ @@ -499,11 +471,11 @@ public class FairScheduler extends * Return the resource amount that this queue is allowed to preempt, if any. * If the queue has been below its min share for at least its preemption * timeout, it should preempt the difference between its current share and - * this min share. If it has been below half its fair share for at least the - * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its - * full fair share. If both conditions hold, we preempt the max of the two - * amounts (this shouldn't happen unless someone sets the timeouts to be - * identical for some reason). + * this min share. If it has been below its fair share preemption threshold + * for at least the fairSharePreemptionTimeout, it should preempt enough tasks + * to get up to its full fair share. If both conditions hold, we preempt the + * max of the two amounts (this shouldn't happen unless someone sets the + * timeouts to be identical for some reason). */ protected Resource resToPreempt(FSLeafQueue sched, long curTime) { long minShareTimeout = sched.getMinSharePreemptionTimeout(); @@ -516,7 +488,7 @@ public class FairScheduler extends resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) { + if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, sched.getFairShare(), sched.getDemand()); resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, @@ -1094,7 +1066,11 @@ public class FairScheduler extends public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { return super.getApplicationAttempt(appAttemptId); } - + + public static ResourceCalculator getResourceCalculator() { + return RESOURCE_CALCULATOR; + } + /** * Subqueue metrics might be a little out of date because fair shares are * recalculated at the update interval, but the root queue metrics needs to diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 2444ba422d1..61b3b6c325f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -181,7 +181,7 @@ public class QueueManager { parent.addChildQueue(leafQueue); queues.put(leafQueue.getName(), leafQueue); leafQueues.add(leafQueue); - setPreemptionTimeout(leafQueue, parent, queueConf); + leafQueue.updatePreemptionVariables(); return leafQueue; } else { FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); @@ -193,7 +193,7 @@ public class QueueManager { } parent.addChildQueue(newParent); queues.put(newParent.getName(), newParent); - setPreemptionTimeout(newParent, parent, queueConf); + newParent.updatePreemptionVariables(); parent = newParent; } } @@ -201,29 +201,6 @@ public class QueueManager { return parent; } - /** - * Set the min/fair share preemption timeouts for the given queue. - * If the timeout is configured in the allocation file, the queue will use - * that value; otherwise, the queue inherits the value from its parent queue. - */ - private void setPreemptionTimeout(FSQueue queue, - FSParentQueue parentQueue, AllocationConfiguration queueConf) { - // For min share - long minSharePreemptionTimeout = - queueConf.getMinSharePreemptionTimeout(queue.getQueueName()); - if (minSharePreemptionTimeout == -1) { - minSharePreemptionTimeout = parentQueue.getMinSharePreemptionTimeout(); - } - queue.setMinSharePreemptionTimeout(minSharePreemptionTimeout); - // For fair share - long fairSharePreemptionTimeout = - queueConf.getFairSharePreemptionTimeout(queue.getQueueName()); - if (fairSharePreemptionTimeout == -1) { - fairSharePreemptionTimeout = parentQueue.getFairSharePreemptionTimeout(); - } - queue.setFairSharePreemptionTimeout(fairSharePreemptionTimeout); - } - /** * Make way for the given queue if possible, by removing incompatible * queues with no apps in them. Incompatibility could be due to @@ -409,7 +386,8 @@ public class QueueManager { // Update steady fair shares for all queues rootQueue.recomputeSteadyShares(); - // Update the fair share preemption timeouts for all queues recursively - rootQueue.updatePreemptionTimeouts(); + // Update the fair share preemption timeouts and preemption for all queues + // recursively + rootQueue.updatePreemptionVariables(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index 14b3111c07f..656e20d4c7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -187,13 +187,15 @@ public class TestAllocationFileLoaderService { out.println(""); out.println(""); // Create hierarchical queues G,H, with different min/fair share preemption - // timeouts + // timeouts and preemption thresholds out.println(""); out.println("120"); out.println("50"); + out.println("0.6"); out.println(" "); out.println(" 180"); out.println(" 40"); + out.println(" 0.7"); out.println(" "); out.println(""); // Set default limit of apps per queue to 15 @@ -211,6 +213,8 @@ public class TestAllocationFileLoaderService { + ""); // Set default fair share preemption timeout to 5 minutes out.println("300"); + // Set default fair share preemption threshold to 0.4 + out.println("0.4"); // Set default scheduling policy to DRF out.println("drf"); out.println(""); @@ -299,6 +303,26 @@ public class TestAllocationFileLoaderService { assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG")); assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH")); + assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root"), 0.01); + assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueF"), 0.01); + assertEquals(.6f, + queueConf.getFairSharePreemptionThreshold("root.queueG"), 0.01); + assertEquals(.7f, + queueConf.getFairSharePreemptionThreshold("root.queueG.queueH"), 0.01); + assertTrue(queueConf.getConfiguredQueues() .get(FSQueueType.PARENT) .contains("root.queueF")); @@ -346,9 +370,10 @@ public class TestAllocationFileLoaderService { out.println(""); out.println("3"); out.println(""); - // Give queue E a preemption timeout of one minute + // Give queue E a preemption timeout of one minute and 0.3f threshold out.println(""); out.println("60"); + out.println("0.3"); out.println(""); // Set default limit of apps per queue to 15 out.println("15"); @@ -363,6 +388,8 @@ public class TestAllocationFileLoaderService { + ""); // Set fair share preemption timeout to 5 minutes out.println("300"); + // Set default fair share preemption threshold to 0.6f + out.println("0.6"); out.println(""); out.close(); @@ -429,6 +456,20 @@ public class TestAllocationFileLoaderService { assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC")); assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD")); assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE")); + + assertEquals(.6f, queueConf.getFairSharePreemptionThreshold("root"), 0.01); + assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." + + YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01); + assertEquals(-1, + queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01); + assertEquals(.3f, + queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 7323b6ab050..97736bedd04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -18,50 +18,66 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.File; +import java.io.FileWriter; import java.io.IOException; +import java.io.PrintWriter; +import java.util.Collection; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -public class TestFSLeafQueue { - private FSLeafQueue schedulable = null; - private Resource maxResource = Resources.createResource(10); +public class TestFSLeafQueue extends FairSchedulerTestBase { + private final static String ALLOC_FILE = new File(TEST_DIR, + TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath(); + private Resource maxResource = Resources.createResource(1024 * 8); @Before public void setup() throws IOException { - FairScheduler scheduler = new FairScheduler(); - Configuration conf = createConfiguration(); - // All tests assume only one assignment per node update - conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); - ResourceManager resourceManager = new ResourceManager(); - resourceManager.init(conf); - ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start(); - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - String queueName = "root.queue1"; - scheduler.allocConf = mock(AllocationConfiguration.class); - when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource); - when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none()); + conf = createConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, + ResourceScheduler.class); + } - schedulable = new FSLeafQueue(queueName, scheduler, null); + @After + public void teardown() { + if (resourceManager != null) { + resourceManager.stop(); + resourceManager = null; + } + conf = null; } @Test public void testUpdateDemand() { + conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + scheduler.allocConf = mock(AllocationConfiguration.class); + + String queueName = "root.queue1"; + when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource); + when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none()); + FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null); + FSAppAttempt app = mock(FSAppAttempt.class); Mockito.when(app.getDemand()).thenReturn(maxResource); @@ -73,11 +89,137 @@ public class TestFSLeafQueue { assertTrue("Demand is greater than max allowed ", Resources.equals(schedulable.getDemand(), maxResource)); } - - private Configuration createConfiguration() { - Configuration conf = new YarnConfiguration(); - conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class, - ResourceScheduler.class); - return conf; + + @Test (timeout = 5000) + public void test() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("2048mb,0vcores"); + out.println(""); + out.println(""); + out.println("2048mb,0vcores"); + out.println(""); + out.println(""); + out.close(); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + scheduler.update(); + + // Queue A wants 3 * 1024. Node update gives this all to A + createSchedulingRequest(3 * 1024, "queueA", "user1"); + scheduler.update(); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeEvent2); + + // Queue B arrives and wants 1 * 1024 + createSchedulingRequest(1 * 1024, "queueB", "user1"); + scheduler.update(); + Collection queues = scheduler.getQueueManager().getLeafQueues(); + assertEquals(3, queues.size()); + + // Queue A should be above min share, B below. + FSLeafQueue queueA = + scheduler.getQueueManager().getLeafQueue("queueA", false); + FSLeafQueue queueB = + scheduler.getQueueManager().getLeafQueue("queueB", false); + assertFalse(queueA.isStarvedForMinShare()); + assertTrue(queueB.isStarvedForMinShare()); + + // Node checks in again, should allocate for B + scheduler.handle(nodeEvent2); + // Now B should have min share ( = demand here) + assertFalse(queueB.isStarvedForMinShare()); + } + + @Test (timeout = 5000) + public void testIsStarvedForFairShare() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(".2"); + out.println(""); + out.println(""); + out.println(".8"); + out.println(".4"); + out.println(""); + out.println(""); + out.println(""); + out.println(".6"); + out.println(""); + out.println(""); + out.println(".5"); + out.println(""); + out.close(); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + scheduler.update(); + + // Queue A wants 4 * 1024. Node update gives this all to A + createSchedulingRequest(1 * 1024, "queueA", "user1", 4); + scheduler.update(); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + for (int i = 0; i < 4; i ++) { + scheduler.handle(nodeEvent2); + } + + QueueManager queueMgr = scheduler.getQueueManager(); + FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); + assertEquals(4 * 1024, queueA.getResourceUsage().getMemory()); + + // Both queue B1 and queue B2 want 3 * 1024 + createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3); + createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3); + scheduler.update(); + for (int i = 0; i < 4; i ++) { + scheduler.handle(nodeEvent2); + } + + FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false); + FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false); + assertEquals(2 * 1024, queueB1.getResourceUsage().getMemory()); + assertEquals(2 * 1024, queueB2.getResourceUsage().getMemory()); + + // For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share + // threshold is 1.6 * 1024 + assertFalse(queueB1.isStarvedForFairShare()); + + // For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share + // threshold is 2.4 * 1024 + assertTrue(queueB2.isStarvedForFairShare()); + + // Node checks in again + scheduler.handle(nodeEvent2); + scheduler.handle(nodeEvent2); + assertEquals(3 * 1024, queueB1.getResourceUsage().getMemory()); + assertEquals(3 * 1024, queueB2.getResourceUsage().getMemory()); + + // Both queue B1 and queue B2 usages go to 3 * 1024 + assertFalse(queueB1.isStarvedForFairShare()); + assertFalse(queueB2.isStarvedForFairShare()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 836102534a1..a14a38ec706 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1061,9 +1061,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(" "); out.println(" 100"); out.println(" 120"); + out.println(" .5"); out.println(""); out.println("300"); out.println("200"); + out.println(".6"); out.println(""); out.close(); @@ -1080,125 +1082,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { assertEquals(100000, root.getFairSharePreemptionTimeout()); assertEquals(120000, root.getMinSharePreemptionTimeout()); - } - - @Test (timeout = 5000) - public void testIsStarvedForMinShare() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println("2048mb,0vcores"); - out.println(""); - out.println(""); - out.println("2048mb,0vcores"); - out.println(""); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - // Queue A wants 3 * 1024. Node update gives this all to A - createSchedulingRequest(3 * 1024, "queueA", "user1"); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); - - // Queue B arrives and wants 1 * 1024 - createSchedulingRequest(1 * 1024, "queueB", "user1"); - scheduler.update(); - Collection queues = scheduler.getQueueManager().getLeafQueues(); - assertEquals(3, queues.size()); - - // Queue A should be above min share, B below. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueA")) { - assertEquals(false, scheduler.isStarvedForMinShare(p)); - } - else if (p.getName().equals("root.queueB")) { - assertEquals(true, scheduler.isStarvedForMinShare(p)); - } - } - - // Node checks in again, should allocate for B - scheduler.handle(nodeEvent2); - // Now B should have min share ( = demand here) - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueB")) { - assertEquals(false, scheduler.isStarvedForMinShare(p)); - } - } - } - - @Test (timeout = 5000) - public void testIsStarvedForFairShare() throws Exception { - conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - out.println(""); - out.println(".25"); - out.println(""); - out.println(""); - out.println(".75"); - out.println(""); - out.println(""); - out.close(); - - scheduler.init(conf); - scheduler.start(); - scheduler.reinitialize(conf, resourceManager.getRMContext()); - - // Add one big node (only care about aggregate capacity) - RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, - "127.0.0.1"); - NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); - scheduler.handle(nodeEvent1); - - // Queue A wants 3 * 1024. Node update gives this all to A - createSchedulingRequest(3 * 1024, "queueA", "user1"); - scheduler.update(); - NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); - scheduler.handle(nodeEvent2); - - // Queue B arrives and wants 1 * 1024 - createSchedulingRequest(1 * 1024, "queueB", "user1"); - scheduler.update(); - Collection queues = scheduler.getQueueManager().getLeafQueues(); - assertEquals(3, queues.size()); - - // Queue A should be above fair share, B below. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueA")) { - assertEquals(false, scheduler.isStarvedForFairShare(p)); - } - else if (p.getName().equals("root.queueB")) { - assertEquals(true, scheduler.isStarvedForFairShare(p)); - } - } - - // Node checks in again, should allocate for B - scheduler.handle(nodeEvent2); - // B should not be starved for fair share, since entire demand is - // satisfied. - for (FSLeafQueue p : queues) { - if (p.getName().equals("root.queueB")) { - assertEquals(false, scheduler.isStarvedForFairShare(p)); - } - } + assertEquals(0.5f, root.getFairSharePreemptionThreshold(), 0.01); } @Test (timeout = 5000) @@ -1385,7 +1269,8 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.println("2"); out.println(""); - out.print("10"); + out.println("10"); + out.println(".5"); out.println(""); out.close(); @@ -1468,8 +1353,9 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(".25"); out.println("1024mb,0vcores"); out.println(""); - out.print("5"); - out.print("10"); + out.println("5"); + out.println("10"); + out.println(".5"); out.println(""); out.close(); @@ -1753,8 +1639,6 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); - MockClock clock = new MockClock(); - scheduler.setClock(clock); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -1842,6 +1726,32 @@ public class TestFairScheduler extends FairSchedulerTestBase { .getFairSharePreemptionTimeout()); } + @Test + public void testPreemptionVariablesForQueueCreatedRuntime() throws Exception { + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Set preemption variables for the root queue + FSParentQueue root = scheduler.getQueueManager().getRootQueue(); + root.setMinSharePreemptionTimeout(10000); + root.setFairSharePreemptionTimeout(15000); + root.setFairSharePreemptionThreshold(.6f); + + // User1 submits one application + ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1); + createApplicationWithAMResource(appAttemptId, "default", "user1", null); + + // The user1 queue should inherit the configurations from the root queue + FSLeafQueue userQueue = + scheduler.getQueueManager().getLeafQueue("user1", true); + assertEquals(1, userQueue.getRunnableAppSchedulables().size()); + assertEquals(10000, userQueue.getMinSharePreemptionTimeout()); + assertEquals(15000, userQueue.getFairSharePreemptionTimeout()); + assertEquals(.6f, userQueue.getFairSharePreemptionThreshold(), 0.001); + } + @Test (timeout = 5000) public void testMultipleContainersWaitingForReservation() throws IOException { scheduler.init(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 0b2ebe73c6a..d1ce36a1f59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -277,6 +277,12 @@ Allocation file format threshold before it will try to preempt containers to take resources from other queues. If not set, the queue will inherit the value from its parent queue. + * fairSharePreemptionThreshold: the fair share preemption threshold for the + queue. If the queue waits fairSharePreemptionTimeout without receiving + fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt + containers to take resources from other queues. If not set, the queue will + inherit the value from its parent queue. + * <>, which represent settings governing the behavior of individual users. They can contain a single property: maxRunningApps, a limit on the number of running apps for a particular user. @@ -292,6 +298,10 @@ Allocation file format preemption timeout for the root queue; overridden by minSharePreemptionTimeout element in root queue. + * <>, which sets the fair share + preemption threshold for the root queue; overridden by fairSharePreemptionThreshold + element in root queue. + * <>, which sets the default running app limit for queues; overriden by maxRunningApps element in each queue.