YARN-2395. FairScheduler: Preemption timeout should be configurable per queue. (Wei Yan via kasha)

(cherry picked from commit 0f34e6f387)
This commit is contained in:
Karthik Kambatla 2014-08-30 01:17:13 -07:00
parent 38853c97c2
commit 7b9e763138
10 changed files with 474 additions and 73 deletions

View File

@ -38,6 +38,9 @@ Release 2.6.0 - UNRELEASED
YARN-2393. FairScheduler: Add the notion of steady fair share. YARN-2393. FairScheduler: Add the notion of steady fair share.
(Wei Yan via kasha) (Wei Yan via kasha)
YARN-2395. FairScheduler: Preemption timeout should be configurable per
queue. (Wei Yan via kasha)
IMPROVEMENTS IMPROVEMENTS
YARN-2242. Improve exception information on AM launch crashes. (Li Lu YARN-2242. Improve exception information on AM launch crashes. (Li Lu

View File

@ -65,13 +65,10 @@ public class AllocationConfiguration {
// preempt other jobs' tasks. // preempt other jobs' tasks.
private final Map<String, Long> minSharePreemptionTimeouts; private final Map<String, Long> minSharePreemptionTimeouts;
// Default min share preemption timeout for queues where it is not set // Fair share preemption timeout for each queue in seconds. If a job in the
// explicitly. // queue waits this long without receiving its fair share threshold, it is
private final long defaultMinSharePreemptionTimeout; // allowed to preempt other jobs' tasks.
private final Map<String, Long> fairSharePreemptionTimeouts;
// Preemption timeout for jobs below fair share in seconds. If a job remains
// below half its fair share for this long, it is allowed to preempt tasks.
private final long fairSharePreemptionTimeout;
private final Map<String, SchedulingPolicy> schedulingPolicies; private final Map<String, SchedulingPolicy> schedulingPolicies;
@ -94,8 +91,8 @@ public class AllocationConfiguration {
Map<String, SchedulingPolicy> schedulingPolicies, Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy, SchedulingPolicy defaultSchedulingPolicy,
Map<String, Long> minSharePreemptionTimeouts, Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, Map<String, Map<QueueACL, AccessControlList>> queueAcls,
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout,
QueuePlacementPolicy placementPolicy, QueuePlacementPolicy placementPolicy,
Map<FSQueueType, Set<String>> configuredQueues) { Map<FSQueueType, Set<String>> configuredQueues) {
this.minQueueResources = minQueueResources; this.minQueueResources = minQueueResources;
@ -110,9 +107,8 @@ public class AllocationConfiguration {
this.defaultSchedulingPolicy = defaultSchedulingPolicy; this.defaultSchedulingPolicy = defaultSchedulingPolicy;
this.schedulingPolicies = schedulingPolicies; this.schedulingPolicies = schedulingPolicies;
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts;
this.queueAcls = queueAcls; this.queueAcls = queueAcls;
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
this.placementPolicy = placementPolicy; this.placementPolicy = placementPolicy;
this.configuredQueues = configuredQueues; this.configuredQueues = configuredQueues;
} }
@ -129,8 +125,7 @@ public class AllocationConfiguration {
queueMaxAMShareDefault = -1.0f; queueMaxAMShareDefault = -1.0f;
queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>(); queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
minSharePreemptionTimeouts = new HashMap<String, Long>(); minSharePreemptionTimeouts = new HashMap<String, Long>();
defaultMinSharePreemptionTimeout = Long.MAX_VALUE; fairSharePreemptionTimeouts = new HashMap<String, Long>();
fairSharePreemptionTimeout = Long.MAX_VALUE;
schedulingPolicies = new HashMap<String, SchedulingPolicy>(); schedulingPolicies = new HashMap<String, SchedulingPolicy>();
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY; defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
configuredQueues = new HashMap<FSQueueType, Set<String>>(); configuredQueues = new HashMap<FSQueueType, Set<String>>();
@ -159,23 +154,22 @@ public class AllocationConfiguration {
} }
/** /**
* Get a queue's min share preemption timeout, in milliseconds. This is the * Get a queue's min share preemption timeout configured in the allocation
* time after which jobs in the queue may kill other queues' tasks if they * file, in milliseconds. Return -1 if not set.
* are below their min share.
*/ */
public long getMinSharePreemptionTimeout(String queueName) { public long getMinSharePreemptionTimeout(String queueName) {
Long minSharePreemptionTimeout = minSharePreemptionTimeouts.get(queueName); Long minSharePreemptionTimeout = minSharePreemptionTimeouts.get(queueName);
return (minSharePreemptionTimeout == null) ? defaultMinSharePreemptionTimeout return (minSharePreemptionTimeout == null) ? -1 : minSharePreemptionTimeout;
: minSharePreemptionTimeout;
} }
/** /**
* Get the fair share preemption, in milliseconds. This is the time * Get a queue's fair share preemption timeout configured in the allocation
* after which any job may kill other jobs' tasks if it is below half * file, in milliseconds. Return -1 if not set.
* its fair share.
*/ */
public long getFairSharePreemptionTimeout() { public long getFairSharePreemptionTimeout(String queueName) {
return fairSharePreemptionTimeout; Long fairSharePreemptionTimeout = fairSharePreemptionTimeouts.get(queueName);
return (fairSharePreemptionTimeout == null) ?
-1 : fairSharePreemptionTimeout;
} }
public ResourceWeights getQueueWeight(String queue) { public ResourceWeights getQueueWeight(String queue) {

View File

@ -217,27 +217,28 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>(); Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>(); Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>(); Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Long> fairSharePreemptionTimeouts = new HashMap<String, Long>();
Map<String, Map<QueueACL, AccessControlList>> queueAcls = Map<String, Map<QueueACL, AccessControlList>> queueAcls =
new HashMap<String, Map<QueueACL, AccessControlList>>(); new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE; int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE;
float queueMaxAMShareDefault = -1.0f; float queueMaxAMShareDefault = -1.0f;
long fairSharePreemptionTimeout = Long.MAX_VALUE; long defaultFairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
QueuePlacementPolicy newPlacementPolicy = null; QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc. // Remember all queue names so we can display them on web UI, etc.
// configuredQueues is segregated based on whether it is a leaf queue // configuredQueues is segregated based on whether it is a leaf queue
// or a parent queue. This information is used for creating queues // or a parent queue. This information is used for creating queues
// and also for making queue placement decisions(QueuePlacementRule.java). // and also for making queue placement decisions(QueuePlacementRule.java).
Map<FSQueueType, Set<String>> configuredQueues = Map<FSQueueType, Set<String>> configuredQueues =
new HashMap<FSQueueType, Set<String>>(); new HashMap<FSQueueType, Set<String>>();
for (FSQueueType queueType : FSQueueType.values()) { for (FSQueueType queueType : FSQueueType.values()) {
configuredQueues.put(queueType, new HashSet<String>()); configuredQueues.put(queueType, new HashSet<String>());
} }
// Read and parse the allocations file. // Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory docBuilderFactory =
DocumentBuilderFactory.newInstance(); DocumentBuilderFactory.newInstance();
@ -276,10 +277,16 @@ public class AllocationFileLoaderService extends AbstractService {
String text = ((Text)element.getFirstChild()).getData().trim(); String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text); int val = Integer.parseInt(text);
userMaxAppsDefault = val; userMaxAppsDefault = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) { } else if ("defaultFairSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim(); String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L; long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeout = val; defaultFairSharePreemptionTimeout = val;
} else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
if (defaultFairSharePreemptionTimeout == Long.MAX_VALUE) {
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultFairSharePreemptionTimeout = val;
}
} else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) { } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
String text = ((Text)element.getFirstChild()).getData().trim(); String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L; long val = Long.parseLong(text) * 1000L;
@ -304,7 +311,7 @@ public class AllocationFileLoaderService extends AbstractService {
} }
} }
} }
// Load queue elements. A root queue can either be included or omitted. If // Load queue elements. A root queue can either be included or omitted. If
// it's included, all other queues must be inside it. // it's included, all other queues must be inside it.
for (Element element : queueElements) { for (Element element : queueElements) {
@ -318,10 +325,10 @@ public class AllocationFileLoaderService extends AbstractService {
} }
loadQueue(parent, element, minQueueResources, maxQueueResources, loadQueue(parent, element, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queueAcls, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
configuredQueues); queueAcls, configuredQueues);
} }
// Load placement policy and pass it configured queues // Load placement policy and pass it configured queues
Configuration conf = getConfig(); Configuration conf = getConfig();
if (placementPolicyElement != null) { if (placementPolicyElement != null) {
@ -331,11 +338,22 @@ public class AllocationFileLoaderService extends AbstractService {
newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf, newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
configuredQueues); configuredQueues);
} }
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources, // Set the min/fair share preemption timeout for the root queue
queueMaxApps, userMaxApps, queueWeights, queueMaxAMShares, userMaxAppsDefault, if (!minSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)){
queueMaxAppsDefault, queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts, minSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
}
if (!fairSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)) {
fairSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE,
defaultFairSharePreemptionTimeout);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources,
maxQueueResources, queueMaxApps, userMaxApps, queueWeights,
queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault,
queueMaxAMShareDefault, queuePolicies, defaultSchedPolicy,
minSharePreemptionTimeouts, fairSharePreemptionTimeouts, queueAcls,
newPlacementPolicy, configuredQueues); newPlacementPolicy, configuredQueues);
lastSuccessfulReload = clock.getTime(); lastSuccessfulReload = clock.getTime();
@ -353,6 +371,7 @@ public class AllocationFileLoaderService extends AbstractService {
Map<String, ResourceWeights> queueWeights, Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies, Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts, Map<String, Long> minSharePreemptionTimeouts,
Map<String, Long> fairSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls, Map<String, Map<QueueACL, AccessControlList>> queueAcls,
Map<FSQueueType, Set<String>> configuredQueues) Map<FSQueueType, Set<String>> configuredQueues)
throws AllocationConfigurationException { throws AllocationConfigurationException {
@ -395,6 +414,10 @@ public class AllocationFileLoaderService extends AbstractService {
String text = ((Text)field.getFirstChild()).getData().trim(); String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L; long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(queueName, val); minSharePreemptionTimeouts.put(queueName, val);
} else if ("fairSharePreemptionTimeout".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
fairSharePreemptionTimeouts.put(queueName, val);
} else if ("schedulingPolicy".equals(field.getTagName()) } else if ("schedulingPolicy".equals(field.getTagName())
|| "schedulingMode".equals(field.getTagName())) { || "schedulingMode".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim(); String text = ((Text)field.getFirstChild()).getData().trim();
@ -410,8 +433,8 @@ public class AllocationFileLoaderService extends AbstractService {
"pool".equals(field.getTagName())) { "pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources, loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
queuePolicies, minSharePreemptionTimeouts, queueAcls, queuePolicies, minSharePreemptionTimeouts,
configuredQueues); fairSharePreemptionTimeouts, queueAcls, configuredQueues);
configuredQueues.get(FSQueueType.PARENT).add(queueName); configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false; isLeaf = false;
} }

View File

@ -77,6 +77,15 @@ public class FSParentQueue extends FSQueue {
} }
} }
@Override
public void updatePreemptionTimeouts() {
super.updatePreemptionTimeouts();
// For child queues
for (FSQueue childQueue : childQueues) {
childQueue.updatePreemptionTimeouts();
}
}
@Override @Override
public Resource getDemand() { public Resource getDemand() {
return demand; return demand;

View File

@ -52,6 +52,9 @@ public abstract class FSQueue implements Queue, Schedulable {
protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY; protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY;
private long fairSharePreemptionTimeout = Long.MAX_VALUE;
private long minSharePreemptionTimeout = Long.MAX_VALUE;
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.name = name; this.name = name;
this.scheduler = scheduler; this.scheduler = scheduler;
@ -166,13 +169,47 @@ public abstract class FSQueue implements Queue, Schedulable {
public boolean hasAccess(QueueACL acl, UserGroupInformation user) { public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
return scheduler.getAllocationConfiguration().hasAccess(name, acl, user); return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
} }
public long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
}
public void setFairSharePreemptionTimeout(long fairSharePreemptionTimeout) {
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
}
public long getMinSharePreemptionTimeout() {
return minSharePreemptionTimeout;
}
public void setMinSharePreemptionTimeout(long minSharePreemptionTimeout) {
this.minSharePreemptionTimeout = minSharePreemptionTimeout;
}
/** /**
* Recomputes the shares for all child queues and applications based on this * Recomputes the shares for all child queues and applications based on this
* queue's current share * queue's current share
*/ */
public abstract void recomputeShares(); public abstract void recomputeShares();
/**
* Update the min/fair share preemption timeouts for this queue.
*/
public void updatePreemptionTimeouts() {
// For min share
minSharePreemptionTimeout = scheduler.getAllocationConfiguration()
.getMinSharePreemptionTimeout(getName());
if (minSharePreemptionTimeout == -1 && parent != null) {
minSharePreemptionTimeout = parent.getMinSharePreemptionTimeout();
}
// For fair share
fairSharePreemptionTimeout = scheduler.getAllocationConfiguration()
.getFairSharePreemptionTimeout(getName());
if (fairSharePreemptionTimeout == -1 && parent != null) {
fairSharePreemptionTimeout = parent.getFairSharePreemptionTimeout();
}
}
/** /**
* Gets the children of this queue, if any. * Gets the children of this queue, if any.
*/ */

View File

@ -506,9 +506,8 @@ public class FairScheduler extends
* identical for some reason). * identical for some reason).
*/ */
protected Resource resToPreempt(FSLeafQueue sched, long curTime) { protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
String queue = sched.getName(); long minShareTimeout = sched.getMinSharePreemptionTimeout();
long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue); long fairShareTimeout = sched.getFairSharePreemptionTimeout();
long fairShareTimeout = allocConf.getFairSharePreemptionTimeout();
Resource resDueToMinShare = Resources.none(); Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none(); Resource resDueToFairShare = Resources.none();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {

View File

@ -181,6 +181,7 @@ public class QueueManager {
parent.addChildQueue(leafQueue); parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue); queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue); leafQueues.add(leafQueue);
setPreemptionTimeout(leafQueue, parent, queueConf);
return leafQueue; return leafQueue;
} else { } else {
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent); FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
@ -192,6 +193,7 @@ public class QueueManager {
} }
parent.addChildQueue(newParent); parent.addChildQueue(newParent);
queues.put(newParent.getName(), newParent); queues.put(newParent.getName(), newParent);
setPreemptionTimeout(newParent, parent, queueConf);
parent = newParent; parent = newParent;
} }
} }
@ -199,6 +201,29 @@ public class QueueManager {
return parent; return parent;
} }
/**
* Set the min/fair share preemption timeouts for the given queue.
* If the timeout is configured in the allocation file, the queue will use
* that value; otherwise, the queue inherits the value from its parent queue.
*/
private void setPreemptionTimeout(FSQueue queue,
FSParentQueue parentQueue, AllocationConfiguration queueConf) {
// For min share
long minSharePreemptionTimeout =
queueConf.getMinSharePreemptionTimeout(queue.getQueueName());
if (minSharePreemptionTimeout == -1) {
minSharePreemptionTimeout = parentQueue.getMinSharePreemptionTimeout();
}
queue.setMinSharePreemptionTimeout(minSharePreemptionTimeout);
// For fair share
long fairSharePreemptionTimeout =
queueConf.getFairSharePreemptionTimeout(queue.getQueueName());
if (fairSharePreemptionTimeout == -1) {
fairSharePreemptionTimeout = parentQueue.getFairSharePreemptionTimeout();
}
queue.setFairSharePreemptionTimeout(fairSharePreemptionTimeout);
}
/** /**
* Make way for the given queue if possible, by removing incompatible * Make way for the given queue if possible, by removing incompatible
* queues with no apps in them. Incompatibility could be due to * queues with no apps in them. Incompatibility could be due to
@ -384,5 +409,7 @@ public class QueueManager {
// Update steady fair shares for all queues // Update steady fair shares for all queues
rootQueue.recomputeSteadyShares(); rootQueue.recomputeSteadyShares();
// Update the fair share preemption timeouts for all queues recursively
rootQueue.updatePreemptionTimeouts();
} }
} }

View File

@ -186,9 +186,14 @@ public class TestAllocationFileLoaderService {
//Make queue F a parent queue without configured leaf queues using the 'type' attribute //Make queue F a parent queue without configured leaf queues using the 'type' attribute
out.println("<queue name=\"queueF\" type=\"parent\" >"); out.println("<queue name=\"queueF\" type=\"parent\" >");
out.println("</queue>"); out.println("</queue>");
//Create hierarchical queues G,H // Create hierarchical queues G,H, with different min/fair share preemption
// timeouts
out.println("<queue name=\"queueG\">"); out.println("<queue name=\"queueG\">");
out.println("<fairSharePreemptionTimeout>120</fairSharePreemptionTimeout>");
out.println("<minSharePreemptionTimeout>50</minSharePreemptionTimeout>");
out.println(" <queue name=\"queueH\">"); out.println(" <queue name=\"queueH\">");
out.println(" <fairSharePreemptionTimeout>180</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>40</minSharePreemptionTimeout>");
out.println(" </queue>"); out.println(" </queue>");
out.println("</queue>"); out.println("</queue>");
// Set default limit of apps per queue to 15 // Set default limit of apps per queue to 15
@ -204,8 +209,8 @@ public class TestAllocationFileLoaderService {
// Set default min share preemption timeout to 2 minutes // Set default min share preemption timeout to 2 minutes
out.println("<defaultMinSharePreemptionTimeout>120" out.println("<defaultMinSharePreemptionTimeout>120"
+ "</defaultMinSharePreemptionTimeout>"); + "</defaultMinSharePreemptionTimeout>");
// Set fair share preemption timeout to 5 minutes // Set default fair share preemption timeout to 5 minutes
out.println("<fairSharePreemptionTimeout>300</fairSharePreemptionTimeout>"); out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
// Set default scheduling policy to DRF // Set default scheduling policy to DRF
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>"); out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>"); out.println("</allocations>");
@ -270,16 +275,30 @@ public class TestAllocationFileLoaderService {
assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC", assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC",
QueueACL.SUBMIT_APPLICATIONS).getAclString()); QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root." + assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME)); YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueB")); assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueC")); assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueD")); assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE")); assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout()); assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueF"));
assertEquals(50000, queueConf.getMinSharePreemptionTimeout("root.queueG"));
assertEquals(40000, queueConf.getMinSharePreemptionTimeout("root.queueG.queueH"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueF"));
assertEquals(120000, queueConf.getFairSharePreemptionTimeout("root.queueG"));
assertEquals(180000, queueConf.getFairSharePreemptionTimeout("root.queueG.queueH"));
assertTrue(queueConf.getConfiguredQueues() assertTrue(queueConf.getConfiguredQueues()
.get(FSQueueType.PARENT) .get(FSQueueType.PARENT)
.contains("root.queueF")); .contains("root.queueF"));
@ -393,16 +412,23 @@ public class TestAllocationFileLoaderService {
assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC", assertEquals("alice,bob admins", queueConf.getQueueAcl("root.queueC",
QueueACL.SUBMIT_APPLICATIONS).getAclString()); QueueACL.SUBMIT_APPLICATIONS).getAclString());
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root." + assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME)); YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA")); assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueB")); assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueB"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueC")); assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueC"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueD")); assertEquals(-1, queueConf.getMinSharePreemptionTimeout("root.queueD"));
assertEquals(120000, queueConf.getMinSharePreemptionTimeout("root.queueA"));
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE")); assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout());
assertEquals(300000, queueConf.getFairSharePreemptionTimeout("root"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root." +
YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueA"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueB"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueC"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueD"));
assertEquals(-1, queueConf.getFairSharePreemptionTimeout("root.queueE"));
} }
@Test @Test

View File

@ -1059,7 +1059,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
out.println(" <queue name=\"child2\">"); out.println(" <queue name=\"child2\">");
out.println(" <minResources>1024mb,4vcores</minResources>"); out.println(" <minResources>1024mb,4vcores</minResources>");
out.println(" </queue>"); out.println(" </queue>");
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
out.println("</queue>"); out.println("</queue>");
out.println("<defaultFairSharePreemptionTimeout>300</defaultFairSharePreemptionTimeout>");
out.println("<defaultMinSharePreemptionTimeout>200</defaultMinSharePreemptionTimeout>");
out.println("</allocations>"); out.println("</allocations>");
out.close(); out.close();
@ -1073,6 +1077,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertNotNull(queueManager.getLeafQueue("child1", false)); assertNotNull(queueManager.getLeafQueue("child1", false));
assertNotNull(queueManager.getLeafQueue("child2", false)); assertNotNull(queueManager.getLeafQueue("child2", false));
assertEquals(100000, root.getFairSharePreemptionTimeout());
assertEquals(120000, root.getMinSharePreemptionTimeout());
} }
@Test (timeout = 5000) @Test (timeout = 5000)
@ -1378,7 +1385,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
out.println("<queue name=\"queueB\">"); out.println("<queue name=\"queueB\">");
out.println("<weight>2</weight>"); out.println("<weight>2</weight>");
out.println("</queue>"); out.println("</queue>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>"); out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("</allocations>"); out.println("</allocations>");
out.close(); out.close();
@ -1462,7 +1469,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
out.println("<minResources>1024mb,0vcores</minResources>"); out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>"); out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>"); out.print("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>"); out.print("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("</allocations>"); out.println("</allocations>");
out.close(); out.close();
@ -1489,7 +1496,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3); scheduler.handle(nodeEvent3);
// Queue A and B each request three containers // Queue A and B each request three containers
ApplicationAttemptId app1 = ApplicationAttemptId app1 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
@ -1563,6 +1569,279 @@ public class TestFairScheduler extends FairSchedulerTestBase {
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); 1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
} }
@Test
/**
* Tests the various timing of decision to preempt tasks.
*/
public void testPreemptionDecisionWithVariousTimeout() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("<maxResources>0mb,0vcores</maxResources>");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("<weight>1</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>2</weight>");
out.println("<minSharePreemptionTimeout>10</minSharePreemptionTimeout>");
out.println("<fairSharePreemptionTimeout>25</fairSharePreemptionTimeout>");
out.println("<queue name=\"queueB1\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("<fairSharePreemptionTimeout>20</fairSharePreemptionTimeout>");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("<weight>1</weight>");
out.println("<minResources>1024mb,0vcores</minResources>");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Check the min/fair share preemption timeout for each queue
QueueManager queueMgr = scheduler.getQueueManager();
assertEquals(30000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("default")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueA")
.getFairSharePreemptionTimeout());
assertEquals(25000, queueMgr.getQueue("queueB")
.getFairSharePreemptionTimeout());
assertEquals(25000, queueMgr.getQueue("queueB.queueB1")
.getFairSharePreemptionTimeout());
assertEquals(20000, queueMgr.getQueue("queueB.queueB2")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueC")
.getFairSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("root")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("default")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueA")
.getMinSharePreemptionTimeout());
assertEquals(10000, queueMgr.getQueue("queueB")
.getMinSharePreemptionTimeout());
assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
.getMinSharePreemptionTimeout());
assertEquals(10000, queueMgr.getQueue("queueB.queueB2")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueC")
.getMinSharePreemptionTimeout());
// Create one big node
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue A takes all resources
for (int i = 0; i < 6; i ++) {
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
}
scheduler.update();
// Sufficient node check-ins to fully schedule containers
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
for (int i = 0; i < 6; i++) {
scheduler.handle(nodeUpdate1);
}
// Now new requests arrive from queues B1, B2 and C
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1);
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2);
createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2);
createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3);
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
scheduler.update();
FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true);
FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true);
FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueB1, clock.getTime())));
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueB2, clock.getTime())));
assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueC, clock.getTime())));
// After 5 seconds, queueB1 wants to preempt min share
scheduler.update();
clock.tick(6);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
0, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 10 seconds, queueB2 wants to preempt min share
scheduler.update();
clock.tick(5);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 15 seconds, queueC wants to preempt min share
scheduler.update();
clock.tick(5);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 20 seconds, queueB2 should want to preempt fair share
scheduler.update();
clock.tick(5);
assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 25 seconds, queueB1 should want to preempt fair share
scheduler.update();
clock.tick(5);
assertEquals(
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
// After 30 seconds, queueC should want to preempt fair share
scheduler.update();
clock.tick(5);
assertEquals(
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
assertEquals(
1536, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
}
@Test
public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
MockClock clock = new MockClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<queue name=\"queueB1\">");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>30</defaultFairSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>40</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Check the min/fair share preemption timeout for each queue
QueueManager queueMgr = scheduler.getQueueManager();
assertEquals(30000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("default")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueA")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
.getFairSharePreemptionTimeout());
assertEquals(30000, queueMgr.getQueue("queueC")
.getFairSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("root")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("default")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueA")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueB")
.getMinSharePreemptionTimeout());
assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
.getMinSharePreemptionTimeout());
assertEquals(15000, queueMgr.getQueue("queueC")
.getMinSharePreemptionTimeout());
// If both exist, we take the default one
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<queue name=\"queueB1\">");
out.println("<minSharePreemptionTimeout>5</minSharePreemptionTimeout>");
out.println("</queue>");
out.println("<queue name=\"queueB2\">");
out.println("</queue>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("</queue>");
out.print("<defaultMinSharePreemptionTimeout>15</defaultMinSharePreemptionTimeout>");
out.print("<defaultFairSharePreemptionTimeout>25</defaultFairSharePreemptionTimeout>");
out.print("<fairSharePreemptionTimeout>30</fairSharePreemptionTimeout>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, resourceManager.getRMContext());
assertEquals(25000, queueMgr.getQueue("root")
.getFairSharePreemptionTimeout());
}
@Test (timeout = 5000) @Test (timeout = 5000)
public void testMultipleContainersWaitingForReservation() throws IOException { public void testMultipleContainersWaitingForReservation() throws IOException {
scheduler.init(conf); scheduler.init(conf);

View File

@ -271,6 +271,11 @@ Allocation file format
* minSharePreemptionTimeout: number of seconds the queue is under its minimum share * minSharePreemptionTimeout: number of seconds the queue is under its minimum share
before it will try to preempt containers to take resources from other queues. before it will try to preempt containers to take resources from other queues.
If not set, the queue will inherit the value from its parent queue.
* fairSharePreemptionTimeout: number of seconds the queue is under its fair share
threshold before it will try to preempt containers to take resources from other
queues. If not set, the queue will inherit the value from its parent queue.
* <<User elements>>, which represent settings governing the behavior of individual * <<User elements>>, which represent settings governing the behavior of individual
users. They can contain a single property: maxRunningApps, a limit on the users. They can contain a single property: maxRunningApps, a limit on the
@ -279,14 +284,13 @@ Allocation file format
* <<A userMaxAppsDefault element>>, which sets the default running app limit * <<A userMaxAppsDefault element>>, which sets the default running app limit
for any users whose limit is not otherwise specified. for any users whose limit is not otherwise specified.
* <<A fairSharePreemptionTimeout element>>, number of seconds a queue is under * <<A defaultFairSharePreemptionTimeout element>>, which sets the fair share
its fair share before it will try to preempt containers to take resources from preemption timeout for the root queue; overridden by fairSharePreemptionTimeout
other queues. element in root queue.
* <<A defaultMinSharePreemptionTimeout element>>, which sets the default number * <<A defaultMinSharePreemptionTimeout element>>, which sets the min share
of seconds the queue is under its minimum share before it will try to preempt preemption timeout for the root queue; overridden by minSharePreemptionTimeout
containers to take resources from other queues; overriden by element in root queue.
minSharePreemptionTimeout element in each queue if specified.
* <<A queueMaxAppsDefault element>>, which sets the default running app limit * <<A queueMaxAppsDefault element>>, which sets the default running app limit
for queues; overriden by maxRunningApps element in each queue. for queues; overriden by maxRunningApps element in each queue.