YARN-2214. FairScheduler: preemptContainerPreCheck() in FSParentQueue delays convergence towards fairness. (Ashwin Shankar via kasha)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1613464 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
100694c34e
commit
6cca715334
|
@ -47,6 +47,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-1342. Recover container tokens upon nodemanager restart. (Jason Lowe via
|
YARN-1342. Recover container tokens upon nodemanager restart. (Jason Lowe via
|
||||||
devaraj)
|
devaraj)
|
||||||
|
|
||||||
|
YARN-2214. FairScheduler: preemptContainerPreCheck() in FSParentQueue delays
|
||||||
|
convergence towards fairness. (Ashwin Shankar via kasha)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -224,16 +224,17 @@ public class FSLeafQueue extends FSQueue {
|
||||||
@Override
|
@Override
|
||||||
public RMContainer preemptContainer() {
|
public RMContainer preemptContainer() {
|
||||||
RMContainer toBePreempted = null;
|
RMContainer toBePreempted = null;
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Queue " + getName() + " is going to preempt a container " +
|
|
||||||
"from its applications.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// If this queue is not over its fair share, reject
|
// If this queue is not over its fair share, reject
|
||||||
if (!preemptContainerPreCheck()) {
|
if (!preemptContainerPreCheck()) {
|
||||||
return toBePreempted;
|
return toBePreempted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Queue " + getName() + " is going to preempt a container " +
|
||||||
|
"from its applications.");
|
||||||
|
}
|
||||||
|
|
||||||
// Choose the app that is most over fair share
|
// Choose the app that is most over fair share
|
||||||
Comparator<Schedulable> comparator = policy.getComparator();
|
Comparator<Schedulable> comparator = policy.getComparator();
|
||||||
AppSchedulable candidateSched = null;
|
AppSchedulable candidateSched = null;
|
||||||
|
@ -328,4 +329,14 @@ public class FSLeafQueue extends FSQueue {
|
||||||
SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
|
SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer) {
|
||||||
// TODO Auto-generated method stub
|
// TODO Auto-generated method stub
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper method to check if the queue should preempt containers
|
||||||
|
*
|
||||||
|
* @return true if check passes (can preempt) or false otherwise
|
||||||
|
*/
|
||||||
|
private boolean preemptContainerPreCheck() {
|
||||||
|
return parent.getPolicy().checkIfUsageOverFairShare(getResourceUsage(),
|
||||||
|
getFairShare());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -164,11 +164,6 @@ public class FSParentQueue extends FSQueue {
|
||||||
public RMContainer preemptContainer() {
|
public RMContainer preemptContainer() {
|
||||||
RMContainer toBePreempted = null;
|
RMContainer toBePreempted = null;
|
||||||
|
|
||||||
// If this queue is not over its fair share, reject
|
|
||||||
if (!preemptContainerPreCheck()) {
|
|
||||||
return toBePreempted;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find the childQueue which is most over fair share
|
// Find the childQueue which is most over fair share
|
||||||
FSQueue candidateQueue = null;
|
FSQueue candidateQueue = null;
|
||||||
Comparator<Schedulable> comparator = policy.getComparator();
|
Comparator<Schedulable> comparator = policy.getComparator();
|
||||||
|
|
|
@ -187,17 +187,4 @@ public abstract class FSQueue extends Schedulable implements Queue {
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Helper method to check if the queue should preempt containers
|
|
||||||
*
|
|
||||||
* @return true if check passes (can preempt) or false otherwise
|
|
||||||
*/
|
|
||||||
protected boolean preemptContainerPreCheck() {
|
|
||||||
if (this == scheduler.getQueueManager().getRootQueue()) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return parent.getPolicy()
|
|
||||||
.checkIfUsageOverFairShare(getResourceUsage(), getFairShare());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1221,6 +1221,79 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
|
scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreemptionIsNotDelayedToNextRound() throws Exception {
|
||||||
|
conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
|
||||||
|
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
|
||||||
|
|
||||||
|
MockClock clock = new MockClock();
|
||||||
|
scheduler.setClock(clock);
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"queueA\">");
|
||||||
|
out.println("<weight>8</weight>");
|
||||||
|
out.println("<queue name=\"queueA1\" />");
|
||||||
|
out.println("<queue name=\"queueA2\" />");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueB\">");
|
||||||
|
out.println("<weight>2</weight>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.print("<fairSharePreemptionTimeout>10</fairSharePreemptionTimeout>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
// Add a node of 8G
|
||||||
|
RMNode node1 = MockNodes.newNodeInfo(1,
|
||||||
|
Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
|
// Run apps in queueA.A1 and queueB
|
||||||
|
ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
|
||||||
|
"queueA.queueA1", "user1", 7, 1);
|
||||||
|
// createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
|
||||||
|
ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
|
||||||
|
"user2", 1, 1);
|
||||||
|
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
||||||
|
for (int i = 0; i < 8; i++) {
|
||||||
|
scheduler.handle(nodeUpdate1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// verify if the apps got the containers they requested
|
||||||
|
assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
||||||
|
assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
||||||
|
|
||||||
|
// Now submit an app in queueA.queueA2
|
||||||
|
ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
|
||||||
|
"queueA.queueA2", "user3", 7, 1);
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
// Let 11 sec pass
|
||||||
|
clock.tick(11);
|
||||||
|
|
||||||
|
scheduler.update();
|
||||||
|
Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
|
||||||
|
.getLeafQueue("queueA.queueA2", false), clock.getTime());
|
||||||
|
assertEquals(2980, toPreempt.getMemory());
|
||||||
|
|
||||||
|
// verify if the 3 containers required by queueA2 are preempted in the same
|
||||||
|
// round
|
||||||
|
scheduler.preemptResources(toPreempt);
|
||||||
|
assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
|
||||||
|
.size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 5000)
|
||||||
/**
|
/**
|
||||||
* Tests the timing of decision to preempt tasks.
|
* Tests the timing of decision to preempt tasks.
|
||||||
|
|
Loading…
Reference in New Issue