YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue. (Wei Yan via kasha)

(cherry picked from commit 1dcaba9a7a)
This commit is contained in:
Karthik Kambatla 2014-09-03 10:27:36 -07:00
parent 9855e884cb
commit 96a13c6d0c
12 changed files with 412 additions and 247 deletions

View File

@ -41,6 +41,9 @@ Release 2.6.0 - UNRELEASED
YARN-2395. FairScheduler: Preemption timeout should be configurable per
queue. (Wei Yan via kasha)
YARN-2394. FairScheduler: Configure fairSharePreemptionThreshold per queue.
(Wei Yan via kasha)
IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu

View File

@ -70,6 +70,12 @@ public class AllocationConfiguration {
// allowed to preempt other jobs' tasks.
private final Map<String, Long> fairSharePreemptionTimeouts;
// The fair share preemption threshold for each queue. If a queue waits
// fairSharePreemptionTimeout without receiving
// fairshare * fairSharePreemptionThreshold resources, it is allowed to
// preempt other queues' tasks.
private final Map<String, Float> fairSharePreemptionThresholds;
private final Map<String, SchedulingPolicy> schedulingPolicies;
private final SchedulingPolicy defaultSchedulingPolicy;
@ -92,6 +98,7 @@ public class AllocationConfiguration {
SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues) {
@ -108,6 +115,7 @@ public class AllocationConfiguration {
this.schedulingPolicies = schedulingPolicies;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts;
this.fairSharePreemptionThresholds = fairSharePreemptionThresholds;
this.queueAcls = queueAcls;
this.placementPolicy = placementPolicy;
this.configuredQueues = configuredQueues;
@ -126,6 +134,7 @@ public class AllocationConfiguration {
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>();
fairSharePreemptionTimeouts = new HashMap<String, Long>();
fairSharePreemptionThresholds = new HashMap<String, Float>();
schedulingPolicies = new HashMap<String, SchedulingPolicy>();
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
configuredQueues = new HashMap<FSQueueType, Set<String>>();
@ -172,6 +181,17 @@ public class AllocationConfiguration {
-1 : fairSharePreemptionTimeout;
}
/**
* Get a queue's fair share preemption threshold in the allocation file.
* Return -1f if not set.
*/
public float getFairSharePreemptionThreshold(String queueName) {
Float fairSharePreemptionThreshold =
fairSharePreemptionThresholds.get(queueName);
return (fairSharePreemptionThreshold == null) ?
-1f : fairSharePreemptionThreshold;
}
public ResourceWeights getQueueWeight(String queue) {
ResourceWeights weight = queueWeights.get(queue);
return (weight == null) ? ResourceWeights.NEUTRAL : weight;

View File

@ -218,6 +218,8 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Float> fairSharePreemptionThresholds =
new HashMap<String, Float>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
@ -225,6 +227,7 @@ public class AllocationFileLoaderService extends AbstractService {
float queueMaxAMShareDefault = -1.0f;
long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
float defaultFairSharePreemptionThreshold = 0.5f;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
QueuePlacementPolicy newPlacementPolicy = null;
@ -277,7 +280,8 @@ public class AllocationFileLoaderService extends AbstractService {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxAppsDefault = val;
} else if ("defaultFairSharePreemptionTimeout".equals(element.getTagName())) {
} else if ("defaultFairSharePreemptionTimeout"
.equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultFairSharePreemptionTimeout = val;
@ -287,10 +291,17 @@ public class AllocationFileLoaderService extends AbstractService {
long val = Long.parseLong(text) * 1000L;
defaultFairSharePreemptionTimeout = val;
}
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
} else if ("defaultMinSharePreemptionTimeout"
.equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultMinSharePreemptionTimeout = val;
} else if ("defaultFairSharePreemptionThreshold"
.equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.max(Math.min(val, 1.0f), 0.0f);
defaultFairSharePreemptionThreshold = val;
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
@ -326,7 +337,7 @@ public class AllocationFileLoaderService extends AbstractService {
loadQueue(parent, element, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
queueAcls, configuredQueues);
fairSharePreemptionThresholds, queueAcls, configuredQueues);
}
// Load placement policy and pass it configured queues
@ -349,11 +360,18 @@ public class AllocationFileLoaderService extends AbstractService {
defaultFairSharePreemptionTimeout);
}
// Set the fair share preemption threshold for the root queue
if (!fairSharePreemptionThresholds.containsKey(QueueManager.ROOT_QUEUE)) {
fairSharePreemptionThresholds.put(QueueManager.ROOT_QUEUE,
defaultFairSharePreemptionThreshold);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy,
minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queueAcls,
minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
fairSharePreemptionThresholds, queueAcls,
newPlacementPolicy, configuredQueues);
lastSuccessfulReload = clock.getTime();
@ -365,13 +383,15 @@ public class AllocationFileLoaderService extends AbstractService {
/**
* Loads a queue from a queue element in the configuration file
*/
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
private void loadQueue(String parentName, Element element,
Map<String, Resource> minQueueResources,
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
Map<String, Integer> userMaxApps, Map<String, Float> queueMaxAMShares,
Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Float> fairSharePreemptionThresholds,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<FSQueueType, Set<String>> configuredQueues)
throws AllocationConfigurationException {
@ -418,6 +438,11 @@ public class AllocationFileLoaderService extends AbstractService {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeouts.put(queueName, val);
} else if ("fairSharePreemptionThreshold".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
float val = Float.parseFloat(text);
val = Math.max(Math.min(val, 1.0f), 0.0f);
fairSharePreemptionThresholds.put(queueName, val);
} else if ("schedulingPolicy".equals(field.getTagName())
|| "schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
@ -434,7 +459,8 @@ public class AllocationFileLoaderService extends AbstractService {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts,
fairSharePreemptionTimeouts, queueAcls, configuredQueues);
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
queueAcls, configuredQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
}
@ -449,11 +475,15 @@ public class AllocationFileLoaderService extends AbstractService {
}
}
queueAcls.put(queueName, acls);
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
if (maxQueueResources.containsKey(queueName) &&
minQueueResources.containsKey(queueName)
&& !Resources.fitsIn(minQueueResources.get(queueName),
maxQueueResources.get(queueName))) {
LOG.warn(String.format("Queue %s has max resources %s less than min resources %s",
queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
LOG.warn(
String.format(
"Queue %s has max resources %s less than min resources %s",
queueName, maxQueueResources.get(queueName),
minQueueResources.get(queueName)));
}
}

View File

@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -54,7 +55,7 @@ public class FSLeafQueue extends FSQueue {
// Variables used for preemption
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
private long lastTimeAtFairShareThreshold;
// Track the AM resource usage for this queue
private Resource amResourceUsage;
@ -65,7 +66,7 @@ public class FSLeafQueue extends FSQueue {
FSParentQueue parent) {
super(name, scheduler, parent);
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
this.lastTimeAtFairShareThreshold = scheduler.getClock().getTime();
activeUsersManager = new ActiveUsersManager(getMetrics());
amResourceUsage = Resource.newInstance(0, 0);
}
@ -275,16 +276,17 @@ public class FSLeafQueue extends FSQueue {
return lastTimeAtMinShare;
}
public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
private void setLastTimeAtMinShare(long lastTimeAtMinShare) {
this.lastTimeAtMinShare = lastTimeAtMinShare;
}
public long getLastTimeAtHalfFairShare() {
return lastTimeAtHalfFairShare;
public long getLastTimeAtFairShareThreshold() {
return lastTimeAtFairShareThreshold;
}
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
private void setLastTimeAtFairShareThreshold(
long lastTimeAtFairShareThreshold) {
this.lastTimeAtFairShareThreshold = lastTimeAtFairShareThreshold;
}
@Override
@ -328,6 +330,20 @@ public class FSLeafQueue extends FSQueue {
// TODO Auto-generated method stub
}
/**
* Update the preemption fields for the queue, i.e. the times since last was
* at its guaranteed share and over its fair share threshold.
*/
public void updateStarvationStats() {
long now = scheduler.getClock().getTime();
if (!isStarvedForMinShare()) {
setLastTimeAtMinShare(now);
}
if (!isStarvedForFairShare()) {
setLastTimeAtFairShareThreshold(now);
}
}
/**
* Helper method to check if the queue should preempt containers
*
@ -337,4 +353,28 @@ public class FSLeafQueue extends FSQueue {
return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
getFairShare());
}
/**
* Is a queue being starved for its min share.
*/
@VisibleForTesting
boolean isStarvedForMinShare() {
return isStarved(getMinShare());
}
/**
* Is a queue being starved for its fair share threshold.
*/
@VisibleForTesting
boolean isStarvedForFairShare() {
return isStarved(
Resources.multiply(getFairShare(), getFairSharePreemptionThreshold()));
}
private boolean isStarved(Resource share) {
Resource desiredShare = Resources.min(FairScheduler.getResourceCalculator(),
scheduler.getClusterResource(), share, getDemand());
return Resources.lessThan(FairScheduler.getResourceCalculator(),
scheduler.getClusterResource(), getResourceUsage(), desiredShare);
}
}

View File

@ -78,11 +78,11 @@ public class FSParentQueue extends FSQueue {
}
@Override
public void updatePreemptionTimeouts() {
super.updatePreemptionTimeouts();
public void updatePreemptionVariables() {
super.updatePreemptionVariables();
// For child queues
for (FSQueue childQueue : childQueues) {
childQueue.updatePreemptionTimeouts();
childQueue.updatePreemptionVariables();
}
}

View File

@ -54,6 +54,7 @@ public abstract class FSQueue implements Queue, Schedulable {
private long fairSharePreemptionTimeout = Long.MAX_VALUE;
private long minSharePreemptionTimeout = Long.MAX_VALUE;
private float fairSharePreemptionThreshold = 0.5f;
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
@ -186,6 +187,14 @@ public abstract class FSQueue implements Queue, Schedulable {
this.minSharePreemptionTimeout = minSharePreemptionTimeout;
}
public float getFairSharePreemptionThreshold() {
return fairSharePreemptionThreshold;
}
public void setFairSharePreemptionThreshold(float fairSharePreemptionThreshold) {
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
}
/**
* Recomputes the shares for all child queues and applications based on this
* queue's current share
@ -193,21 +202,27 @@ public abstract class FSQueue implements Queue, Schedulable {
public abstract void recomputeShares();
/**
* Update the min/fair share preemption timeouts for this queue.
* Update the min/fair share preemption timeouts and threshold for this queue.
*/
public void updatePreemptionTimeouts() {
// For min share
public void updatePreemptionVariables() {
// For min share timeout
minSharePreemptionTimeout = scheduler.getAllocationConfiguration()
.getMinSharePreemptionTimeout(getName());
if (minSharePreemptionTimeout == -1 && parent != null) {
minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout();
}
// For fair share
// For fair share timeout
fairSharePreemptionTimeout = scheduler.getAllocationConfiguration()
.getFairSharePreemptionTimeout(getName());
if (fairSharePreemptionTimeout == -1 && parent != null) {
fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout();
}
// For fair share preemption threshold
fairSharePreemptionThreshold = scheduler.getAllocationConfiguration()
.getFairSharePreemptionThreshold(getName());
if (fairSharePreemptionThreshold < 0 && parent != null) {
fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
}
}
/**

View File

@ -299,7 +299,7 @@ public class FairScheduler extends
*/
protected synchronized void update() {
long start = getClock().getTime();
updatePreemptionVariables(); // Determine if any queues merit preemption
updateStarvationStats(); // Determine if any queues merit preemption
FSQueue rootQueue = queueMgr.getRootQueue();
@ -329,48 +329,20 @@ public class FairScheduler extends
/**
* Update the preemption fields for all QueueScheduables, i.e. the times since
* each queue last was at its guaranteed share and at > 1/2 of its fair share
* for each type of task.
* each queue last was at its guaranteed share and over its fair share
* threshold for each type of task.
*/
private void updatePreemptionVariables() {
long now = getClock().getTime();
lastPreemptionUpdateTime = now;
private void updateStarvationStats() {
lastPreemptionUpdateTime = clock.getTime();
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
if (!isStarvedForMinShare(sched)) {
sched.setLastTimeAtMinShare(now);
}
if (!isStarvedForFairShare(sched)) {
sched.setLastTimeAtHalfFairShare(now);
}
sched.updateStarvationStats();
}
}
/**
* Is a queue below its min share for the given task type?
*/
boolean isStarvedForMinShare(FSLeafQueue sched) {
Resource desiredShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getMinShare(), sched.getDemand());
return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), desiredShare);
}
/**
* Is a queue being starved for fair share for the given task type? This is
* defined as being below half its fair share.
*/
boolean isStarvedForFairShare(FSLeafQueue sched) {
Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR,
clusterResource,
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), desiredFairShare);
}
/**
* Check for queues that need tasks preempted, either because they have been
* below their guaranteed share for minSharePreemptionTimeout or they have
* been below half their fair share for the fairSharePreemptionTimeout. If
* been below their fair share threshold for the fairSharePreemptionTimeout. If
* such queues exist, compute how many tasks of each type need to be preempted
* and then select the right ones using preemptTasks.
*/
@ -499,11 +471,11 @@ public class FairScheduler extends
* Return the resource amount that this queue is allowed to preempt, if any.
* If the queue has been below its min share for at least its preemption
* timeout, it should preempt the difference between its current share and
* this min share. If it has been below half its fair share for at least the
* fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
* full fair share. If both conditions hold, we preempt the max of the two
* amounts (this shouldn't happen unless someone sets the timeouts to be
* identical for some reason).
* this min share. If it has been below its fair share preemption threshold
* for at least the fairSharePreemptionTimeout, it should preempt enough tasks
* to get up to its full fair share. If both conditions hold, we preempt the
* max of the two amounts (this shouldn't happen unless someone sets the
* timeouts to be identical for some reason).
*/
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
long minShareTimeout = sched.getMinSharePreemptionTimeout();
@ -516,7 +488,7 @@ public class FairScheduler extends
resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
}
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
sched.getFairShare(), sched.getDemand());
resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
@ -1095,6 +1067,10 @@ public class FairScheduler extends
return super.getApplicationAttempt(appAttemptId);
}
public static ResourceCalculator getResourceCalculator() {
return RESOURCE_CALCULATOR;
}
/**
* Subqueue metrics might be a little out of date because fair shares are
* recalculated at the update interval, but the root queue metrics needs to

View File

@ -181,7 +181,7 @@ public class QueueManager {
parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue);
setPreemptionTimeout(leafQueue, parent, queueConf);
leafQueue.updatePreemptionVariables();
return leafQueue;
} else {
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
@ -193,7 +193,7 @@ public class QueueManager {
}
parent.addChildQueue(newParent);
queues.put(newParent.getName(), newParent);
setPreemptionTimeout(newParent, parent, queueConf);
newParent.updatePreemptionVariables();
parent = newParent;
}
}
@ -201,29 +201,6 @@ public class QueueManager {
return parent;
}
/**
* Set the min/fair share preemption timeouts for the given queue.
* If the timeout is configured in the allocation file, the queue will use
* that value; otherwise, the queue inherits the value from its parent queue.
*/
private void setPreemptionTimeout(FSQueue queue,
FSParentQueue parentQueue, AllocationConfiguration queueConf) {
// For min share
long minSharePreemptionTimeout =
queueConf.getMinSharePreemptionTimeout(queue.getQueueName());
if (minSharePreemptionTimeout == -1) {
minSharePreemptionTimeout = parentQueue.getMinSharePreemptionTimeout();
}
queue.setMinSharePreemptionTimeout(minSharePreemptionTimeout);
// For fair share
long fairSharePreemptionTimeout =
queueConf.getFairSharePreemptionTimeout(queue.getQueueName());
if (fairSharePreemptionTimeout == -1) {
fairSharePreemptionTimeout = parentQueue.getFairSharePreemptionTimeout();
}
queue.setFairSharePreemptionTimeout(fairSharePreemptionTimeout);
}
/**
* Make way for the given queue if possible, by removing incompatible
* queues with no apps in them. Incompatibility could be due to
@ -409,7 +386,8 @@ public class QueueManager {
// Update steady fair shares for all queues
rootQueue.recomputeSteadyShares();
// Update the fair share preemption timeouts for all queues recursively
rootQueue.updatePreemptionTimeouts();
// Update the fair share preemption timeouts and preemption for all queues
// recursively
rootQueue.updatePreemptionVariables();
}
}

View File

@ -187,13 +187,15 @@ public class TestAllocationFileLoaderService {
out.println("<queue name=\"queueF\" type=\"parent\" >");
out.println("</queue>");
// Create hierarchical queues G,H, with different min/fair share preemption
// timeouts
// timeouts and preemption thresholds
out.println("<queue name=\"queueG\">");
out.println("<fairSharePreemptionTimeout>120</fairSharePreemptionTimeout>");
out.println("<minSharePreemptionTimeout>50</minSharePreemptionTimeout>");
out.println("<fairSharePreemptionThreshold>0.6</fairSharePreemptionThreshold>");
out.println(" <queue name=\"queueH\">");
out.println(" <fairSharePreemptionTimeout>180</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>40</minSharePreemptionTimeout>");
out.println(" <fairSharePreemptionThreshold>0.7</fairSharePreemptionThreshold>");
out.println(" </queue>");
out.println("</queue>");
// Set default limit of apps per queue to 15
@ -211,6 +213,8 @@ public class TestAllocationFileLoaderService {
+ "</defaultMinSharePreemptionTimeout>");
// Set default fair share preemption timeout to 5 minutes
out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
// Set default fair share preemption threshold to 0.4
out.println("<defaultFairSharePreemptionThreshold>0.4</defaultFairSharePreemptionThreshold>");
// Set default scheduling policy to DRF
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
@ -299,6 +303,26 @@ public class TestAllocationFileLoaderService {
assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG"));
assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH"));
assertEquals(.4f, queueConf.getFairSharePreemptionThreshold("root"), 0.01);
assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueF"), 0.01);
assertEquals(.6f,
queueConf.getFairSharePreemptionThreshold("root.queueG"), 0.01);
assertEquals(.7f,
queueConf.getFairSharePreemptionThreshold("root.queueG.queueH"), 0.01);
assertTrue(queueConf.getConfiguredQueues()
.get(FSQueueType.PARENT)
.contains("root.queueF"));
@ -346,9 +370,10 @@ public class TestAllocationFileLoaderService {
out.println("<pool name=\"queueD\">");
out.println("<maxRunningApps>3</maxRunningApps>");
out.println("</pool>");
// Give queue E a preemption timeout of one minute
// Give queue E a preemption timeout of one minute and 0.3f threshold
out.println("<pool name=\"queueE\">");
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
out.println("<fairSharePreemptionThreshold>0.3</fairSharePreemptionThreshold>");
out.println("</pool>");
// Set default limit of apps per queue to 15
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
@ -363,6 +388,8 @@ public class TestAllocationFileLoaderService {
+ "</defaultMinSharePreemptionTimeout>");
// Set fair share preemption timeout to 5 minutes
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>");
// Set default fair share preemption threshold to 0.6f
out.println("<defaultFairSharePreemptionThreshold>0.6</defaultFairSharePreemptionThreshold>");
out.println("</allocations>");
out.close();
@ -429,6 +456,20 @@ public class TestAllocationFileLoaderService {
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
assertEquals(.6f, queueConf.getFairSharePreemptionThreshold("root"), 0.01);
assertEquals(-1, queueConf.getFairSharePreemptionThreshold("root."
+ YarnConfiguration.DEFAULT_QUEUE_NAME), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueA"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueB"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueC"), 0.01);
assertEquals(-1,
queueConf.getFairSharePreemptionThreshold("root.queueD"), 0.01);
assertEquals(.3f,
queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01);
}
@Test

View File

@ -18,50 +18,66 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestFSLeafQueue {
private FSLeafQueue schedulable = null;
private Resource maxResource = Resources.createResource(10);
public class TestFSLeafQueue extends FairSchedulerTestBase {
private final static String ALLOC_FILE = new File(TEST_DIR,
TestFSLeafQueue.class.getName() + ".xml").getAbsolutePath();
private Resource maxResource = Resources.createResource(1024 * 8);
@Before
public void setup() throws IOException {
FairScheduler scheduler = new FairScheduler();
Configuration conf = createConfiguration();
// All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
ResourceManager resourceManager = new ResourceManager();
resourceManager.init(conf);
((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
conf = createConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
}
String queueName = "root.queue1";
scheduler.allocConf = mock(AllocationConfiguration.class);
when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource);
when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none());
schedulable = new FSLeafQueue(queueName, scheduler, null);
@After
public void teardown() {
if (resourceManager != null) {
resourceManager.stop();
resourceManager = null;
}
conf = null;
}
@Test
public void testUpdateDemand() {
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
scheduler.allocConf = mock(AllocationConfiguration.class);
String queueName = "root.queue1";
when(scheduler.allocConf.getMaxResources(queueName)).thenReturn(maxResource);
when(scheduler.allocConf.getMinResources(queueName)).thenReturn(Resources.none());
FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
FSAppAttempt app = mock(FSAppAttempt.class);
Mockito.when(app.getDemand()).thenReturn(maxResource);
@ -74,10 +90,136 @@ public class TestFSLeafQueue {
Resources.equals(schedulable.getDemand(), maxResource));
}
private Configuration createConfiguration() {
Configuration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
return conf;
@Test (timeout = 5000)
public void test() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</allocations>");
out.close();
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
scheduler.update();
// Queue A wants 3 * 1024. Node update gives this all to A
createSchedulingRequest(3 * 1024, "queueA", "user1");
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
// Queue B arrives and wants 1 * 1024
createSchedulingRequest(1 * 1024, "queueB", "user1");
scheduler.update();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
// Queue A should be above min share, B below.
FSLeafQueue queueA =
scheduler.getQueueManager().getLeafQueue("queueA", false);
FSLeafQueue queueB =
scheduler.getQueueManager().getLeafQueue("queueB", false);
assertFalse(queueA.isStarvedForMinShare());
assertTrue(queueB.isStarvedForMinShare());
// Node checks in again, should allocate for B
scheduler.handle(nodeEvent2);
// Now B should have min share ( = demand here)
assertFalse(queueB.isStarvedForMinShare());
}
@Test (timeout = 5000)
public void testIsStarvedForFairShare() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.2</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.8</weight>");
out.println("<fairSharePreemptionThreshold>.4</fairSharePreemptionThreshold>");
out.println("<queue name=\"queueB1\">");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("<fairSharePreemptionThreshold>.6</fairSharePreemptionThreshold>");
out.println("</queue>");
out.println("</queue>");
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
out.println("</allocations>");
out.close();
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
scheduler.update();
// Queue A wants 4 * 1024. Node update gives this all to A
createSchedulingRequest(1 * 1024, "queueA", "user1", 4);
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
for (int i = 0; i < 4; i ++) {
scheduler.handle(nodeEvent2);
}
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
assertEquals(4 * 1024, queueA.getResourceUsage().getMemory());
// Both queue B1 and queue B2 want 3 * 1024
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 3);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 3);
scheduler.update();
for (int i = 0; i < 4; i ++) {
scheduler.handle(nodeEvent2);
}
FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", false);
FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", false);
assertEquals(2 * 1024, queueB1.getResourceUsage().getMemory());
assertEquals(2 * 1024, queueB2.getResourceUsage().getMemory());
// For queue B1, the fairSharePreemptionThreshold is 0.4, and the fair share
// threshold is 1.6 * 1024
assertFalse(queueB1.isStarvedForFairShare());
// For queue B2, the fairSharePreemptionThreshold is 0.6, and the fair share
// threshold is 2.4 * 1024
assertTrue(queueB2.isStarvedForFairShare());
// Node checks in again
scheduler.handle(nodeEvent2);
scheduler.handle(nodeEvent2);
assertEquals(3 * 1024, queueB1.getResourceUsage().getMemory());
assertEquals(3 * 1024, queueB2.getResourceUsage().getMemory());
// Both queue B1 and queue B2 usages go to 3 * 1024
assertFalse(queueB1.isStarvedForFairShare());
assertFalse(queueB2.isStarvedForFairShare());
}
}

View File

@ -1061,9 +1061,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
out.println(" </queue>");
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
out.println(" <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
out.println("</queue>");
out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
out.println("<defaultMinSharePreemptionTimeout>200</defaultMinSharePreemptionTimeout>");
out.println("<defaultFairSharePreemptionThreshold>.6</defaultFairSharePreemptionThreshold>");
out.println("</allocations>");
out.close();
@ -1080,125 +1082,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals(100000, root.getFairSharePreemptionTimeout());
assertEquals(120000, root.getMinSharePreemptionTimeout());
}
@Test (timeout = 5000)
public void testIsStarvedForMinShare() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<minResources>2048mb,0vcores</minResources>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A wants 3 * 1024. Node update gives this all to A
createSchedulingRequest(3 * 1024, "queueA", "user1");
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
// Queue B arrives and wants 1 * 1024
createSchedulingRequest(1 * 1024, "queueB", "user1");
scheduler.update();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
// Queue A should be above min share, B below.
for (FSLeafQueue p : queues) {
if (p.getName().equals("root.queueA")) {
assertEquals(false, scheduler.isStarvedForMinShare(p));
}
else if (p.getName().equals("root.queueB")) {
assertEquals(true, scheduler.isStarvedForMinShare(p));
}
}
// Node checks in again, should allocate for B
scheduler.handle(nodeEvent2);
// Now B should have min share ( = demand here)
for (FSLeafQueue p : queues) {
if (p.getName().equals("root.queueB")) {
assertEquals(false, scheduler.isStarvedForMinShare(p));
}
}
}
@Test (timeout = 5000)
public void testIsStarvedForFairShare() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.75</weight>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A wants 3 * 1024. Node update gives this all to A
createSchedulingRequest(3 * 1024, "queueA", "user1");
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
// Queue B arrives and wants 1 * 1024
createSchedulingRequest(1 * 1024, "queueB", "user1");
scheduler.update();
Collection<FSLeafQueue> queues = scheduler.getQueueManager().getLeafQueues();
assertEquals(3, queues.size());
// Queue A should be above fair share, B below.
for (FSLeafQueue p : queues) {
if (p.getName().equals("root.queueA")) {
assertEquals(false, scheduler.isStarvedForFairShare(p));
}
else if (p.getName().equals("root.queueB")) {
assertEquals(true, scheduler.isStarvedForFairShare(p));
}
}
// Node checks in again, should allocate for B
scheduler.handle(nodeEvent2);
// B should not be starved for fair share, since entire demand is
// satisfied.
for (FSLeafQueue p : queues) {
if (p.getName().equals("root.queueB")) {
assertEquals(false, scheduler.isStarvedForFairShare(p));
}
}
assertEquals(0.5f, root.getFairSharePreemptionThreshold(), 0.01);
}
@Test (timeout = 5000)
@ -1385,7 +1269,8 @@ public class TestFairScheduler extends FairSchedulerTestBase {
out.println("<queue name=\"queueB\">");
out.println("<weight>2</weight>");
out.println("</queue>");
out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
out.println("</allocations>");
out.close();
@ -1468,8 +1353,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
out.println("<weight>.25</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
out.println("</allocations>");
out.close();
@ -1753,8 +1639,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
@Test
public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
@ -1842,6 +1726,32 @@ public class TestFairScheduler extends FairSchedulerTestBase {
.getFairSharePreemptionTimeout());
}
@Test
public void testPreemptionVariablesForQueueCreatedRuntime() throws Exception {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Set preemption variables for the root queue
FSParentQueue root = scheduler.getQueueManager().getRootQueue();
root.setMinSharePreemptionTimeout(10000);
root.setFairSharePreemptionTimeout(15000);
root.setFairSharePreemptionThreshold(.6f);
// User1 submits one application
ApplicationAttemptId appAttemptId = createAppAttemptId(1, 1);
createApplicationWithAMResource(appAttemptId, "default", "user1", null);
// The user1 queue should inherit the configurations from the root queue
FSLeafQueue userQueue =
scheduler.getQueueManager().getLeafQueue("user1", true);
assertEquals(1, userQueue.getRunnableAppSchedulables().size());
assertEquals(10000, userQueue.getMinSharePreemptionTimeout());
assertEquals(15000, userQueue.getFairSharePreemptionTimeout());
assertEquals(.6f, userQueue.getFairSharePreemptionThreshold(), 0.001);
}
@Test (timeout = 5000)
public void testMultipleContainersWaitingForReservation() throws IOException {
scheduler.init(conf);

View File

@ -277,6 +277,12 @@ Allocation file format
threshold before it will try to preempt containers to take resources from other
queues. If not set, the queue will inherit the value from its parent queue.
* fairSharePreemptionThreshold: the fair share preemption threshold for the
queue. If the queue waits fairSharePreemptionTimeout without receiving
fairSharePreemptionThreshold*fairShare resources, it is allowed to preempt
containers to take resources from other queues. If not set, the queue will
inherit the value from its parent queue.
* <<User elements>>, which represent settings governing the behavior of individual
users. They can contain a single property: maxRunningApps, a limit on the
number of running apps for a particular user.
@ -292,6 +298,10 @@ Allocation file format
preemption timeout for the root queue; overridden by minSharePreemptionTimeout
element in root queue.
* <<A defaultFairSharePreemptionThreshold element>>, which sets the fair share
preemption threshold for the root queue; overridden by fairSharePreemptionThreshold
element in root queue.
* <<A queueMaxAppsDefault element>>, which sets the default running app limit
for queues; overriden by maxRunningApps element in each queue.