YARN-3453. Ensure preemption logic in FairScheduler uses DominantResourceCalculator in DRF queues to prevent unnecessary thrashing. (asuresh)

(cherry picked from commit ac94ba3e18)
This commit is contained in:
Arun Suresh 2015-07-14 00:23:55 -07:00
parent c6b0bbabc7
commit 5f58be7dd4
9 changed files with 317 additions and 61 deletions

View File

@ -580,6 +580,9 @@ Release 2.8.0 - UNRELEASED
YARN-3381. Fix typo InvalidStateTransitonException. YARN-3381. Fix typo InvalidStateTransitonException.
(Brahma Reddy Battula via aajisaka) (Brahma Reddy Battula via aajisaka)
YARN-3453. Ensure preemption logic in FairScheduler uses DominantResourceCalculator
in DRF queues to prevent unnecessary thrashing. (asuresh)
Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -560,9 +560,10 @@ public class FSLeafQueue extends FSQueue {
} }
private boolean isStarved(Resource share) { private boolean isStarved(Resource share) {
Resource desiredShare = Resources.min(scheduler.getResourceCalculator(), Resource desiredShare = Resources.min(policy.getResourceCalculator(),
scheduler.getClusterResource(), share, getDemand()); scheduler.getClusterResource(), share, getDemand());
return Resources.lessThan(scheduler.getResourceCalculator(), Resource resourceUsage = getResourceUsage();
scheduler.getClusterResource(), getResourceUsage(), desiredShare); return Resources.lessThan(policy.getResourceCalculator(),
scheduler.getClusterResource(), resourceUsage, desiredShare);
} }
} }

View File

@ -371,10 +371,9 @@ public class FairScheduler extends
Resource resToPreempt = Resources.clone(Resources.none()); Resource resToPreempt = Resources.clone(Resources.none());
for (FSLeafQueue sched : queueMgr.getLeafQueues()) { for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
} }
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, if (isResourceGreaterThanNone(resToPreempt)) {
Resources.none())) {
preemptResources(resToPreempt); preemptResources(resToPreempt);
} }
} }
@ -404,8 +403,7 @@ public class FairScheduler extends
RMContainer container = warnedIter.next(); RMContainer container = warnedIter.next();
if ((container.getState() == RMContainerState.RUNNING || if ((container.getState() == RMContainerState.RUNNING ||
container.getState() == RMContainerState.ALLOCATED) && container.getState() == RMContainerState.ALLOCATED) &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, isResourceGreaterThanNone(toPreempt)) {
toPreempt, Resources.none())) {
warnOrKillContainer(container); warnOrKillContainer(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource()); Resources.subtractFrom(toPreempt, container.getContainer().getResource());
} else { } else {
@ -419,8 +417,7 @@ public class FairScheduler extends
queue.resetPreemptedResources(); queue.resetPreemptedResources();
} }
while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, while (isResourceGreaterThanNone(toPreempt)) {
toPreempt, Resources.none())) {
RMContainer container = RMContainer container =
getQueueManager().getRootQueue().preemptContainer(); getQueueManager().getRootQueue().preemptContainer();
if (container == null) { if (container == null) {
@ -443,6 +440,10 @@ public class FairScheduler extends
fsOpDurations.addPreemptCallDuration(duration); fsOpDurations.addPreemptCallDuration(duration);
} }
private boolean isResourceGreaterThanNone(Resource toPreempt) {
return (toPreempt.getMemory() > 0) || (toPreempt.getVirtualCores() > 0);
}
protected void warnOrKillContainer(RMContainer container) { protected void warnOrKillContainer(RMContainer container) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSAppAttempt app = getSchedulerApp(appAttemptId); FSAppAttempt app = getSchedulerApp(appAttemptId);
@ -485,33 +486,34 @@ public class FairScheduler extends
* max of the two amounts (this shouldn't happen unless someone sets the * max of the two amounts (this shouldn't happen unless someone sets the
* timeouts to be identical for some reason). * timeouts to be identical for some reason).
*/ */
protected Resource resToPreempt(FSLeafQueue sched, long curTime) { protected Resource resourceDeficit(FSLeafQueue sched, long curTime) {
long minShareTimeout = sched.getMinSharePreemptionTimeout(); long minShareTimeout = sched.getMinSharePreemptionTimeout();
long fairShareTimeout = sched.getFairSharePreemptionTimeout(); long fairShareTimeout = sched.getFairSharePreemptionTimeout();
Resource resDueToMinShare = Resources.none(); Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none(); Resource resDueToFairShare = Resources.none();
ResourceCalculator calc = sched.getPolicy().getResourceCalculator();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, Resource target = Resources.componentwiseMin(
sched.getMinShare(), sched.getDemand()); sched.getMinShare(), sched.getDemand());
resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, resDueToMinShare = Resources.max(calc, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage())); Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
} }
if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, Resource target = Resources.componentwiseMin(
sched.getFairShare(), sched.getDemand()); sched.getFairShare(), sched.getDemand());
resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, resDueToFairShare = Resources.max(calc, clusterResource,
Resources.none(), Resources.subtract(target, sched.getResourceUsage())); Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
} }
Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, Resource deficit = Resources.max(calc, clusterResource,
resDueToMinShare, resDueToFairShare); resDueToMinShare, resDueToFairShare);
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, if (Resources.greaterThan(calc, clusterResource,
resToPreempt, Resources.none())) { deficit, Resources.none())) {
String message = "Should preempt " + resToPreempt + " res for queue " String message = "Should preempt " + deficit + " res for queue "
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare + sched.getName() + ": resDueToMinShare = " + resDueToMinShare
+ ", resDueToFairShare = " + resDueToFairShare; + ", resDueToFairShare = " + resDueToFairShare;
LOG.info(message); LOG.info(message);
} }
return resToPreempt; return deficit;
} }
public synchronized RMContainerTokenSecretManager public synchronized RMContainerTokenSecretManager

View File

@ -26,6 +26,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Dom
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import java.util.Collection; import java.util.Collection;
import java.util.Comparator; import java.util.Comparator;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -97,6 +100,14 @@ public abstract class SchedulingPolicy {
public void initialize(Resource clusterCapacity) {} public void initialize(Resource clusterCapacity) {}
/**
* The {@link ResourceCalculator} returned by this method should be used
* for any calculations involving resources.
*
* @return ResourceCalculator instance to use
*/
public abstract ResourceCalculator getResourceCalculator();
/** /**
* @return returns the name of {@link SchedulingPolicy} * @return returns the name of {@link SchedulingPolicy}
*/ */

View File

@ -29,6 +29,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*; import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*;
@ -44,8 +47,10 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
public static final String NAME = "DRF"; public static final String NAME = "DRF";
private DominantResourceFairnessComparator comparator = private static final DominantResourceFairnessComparator COMPARATOR =
new DominantResourceFairnessComparator(); new DominantResourceFairnessComparator();
private static final DominantResourceCalculator CALCULATOR =
new DominantResourceCalculator();
@Override @Override
public String getName() { public String getName() {
@ -59,7 +64,12 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
@Override @Override
public Comparator<Schedulable> getComparator() { public Comparator<Schedulable> getComparator() {
return comparator; return COMPARATOR;
}
@Override
public ResourceCalculator getResourceCalculator() {
return CALCULATOR;
} }
@Override @Override
@ -105,7 +115,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
@Override @Override
public void initialize(Resource clusterCapacity) { public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity); COMPARATOR.setClusterCapacity(clusterCapacity);
} }
public static class DominantResourceFairnessComparator implements Comparator<Schedulable> { public static class DominantResourceFairnessComparator implements Comparator<Schedulable> {

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -43,7 +44,8 @@ public class FairSharePolicy extends SchedulingPolicy {
public static final String NAME = "fair"; public static final String NAME = "fair";
private static final DefaultResourceCalculator RESOURCE_CALCULATOR = private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
new DefaultResourceCalculator(); new DefaultResourceCalculator();
private FairShareComparator comparator = new FairShareComparator(); private static final FairShareComparator COMPARATOR =
new FairShareComparator();
@Override @Override
public String getName() { public String getName() {
@ -111,7 +113,12 @@ public class FairSharePolicy extends SchedulingPolicy {
@Override @Override
public Comparator<Schedulable> getComparator() { public Comparator<Schedulable> getComparator() {
return comparator; return COMPARATOR;
}
@Override
public ResourceCalculator getResourceCalculator() {
return RESOURCE_CALCULATOR;
} }
@Override @Override

View File

@ -27,6 +27,10 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -36,7 +40,9 @@ import com.google.common.annotations.VisibleForTesting;
public class FifoPolicy extends SchedulingPolicy { public class FifoPolicy extends SchedulingPolicy {
@VisibleForTesting @VisibleForTesting
public static final String NAME = "FIFO"; public static final String NAME = "FIFO";
private FifoComparator comparator = new FifoComparator(); private static final FifoComparator COMPARATOR = new FifoComparator();
private static final DefaultResourceCalculator CALCULATOR =
new DefaultResourceCalculator();
@Override @Override
public String getName() { public String getName() {
@ -68,7 +74,12 @@ public class FifoPolicy extends SchedulingPolicy {
@Override @Override
public Comparator<Schedulable> getComparator() { public Comparator<Schedulable> getComparator() {
return comparator; return COMPARATOR;
}
@Override
public ResourceCalculator getResourceCalculator() {
return CALCULATOR;
} }
@Override @Override

View File

@ -233,6 +233,70 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
assertFalse(queueB2.isStarvedForFairShare()); assertFalse(queueB2.isStarvedForFairShare());
} }
@Test (timeout = 5000)
public void testIsStarvedForFairShareDRF() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.5</weight>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.5</weight>");
out.println("</queue>");
out.println("<defaultFairSharePreemptionThreshold>1</defaultFairSharePreemptionThreshold>");
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
resourceManager = new MockRM(conf);
resourceManager.start();
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
// Add one big node (only care about aggregate capacity)
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
scheduler.update();
// Queue A wants 7 * 1024, 1. Node update gives this all to A
createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeEvent2);
QueueManager queueMgr = scheduler.getQueueManager();
FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
assertEquals(7 * 1024, queueA.getResourceUsage().getMemory());
assertEquals(1, queueA.getResourceUsage().getVirtualCores());
// Queue B has 3 reqs :
// 1) 2 * 1024, 5 .. which will be granted
// 2) 1 * 1024, 1 .. which will be granted
// 3) 1 * 1024, 1 .. which wont
createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1);
createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2);
scheduler.update();
for (int i = 0; i < 3; i ++) {
scheduler.handle(nodeEvent2);
}
FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false);
assertEquals(3 * 1024, queueB.getResourceUsage().getMemory());
assertEquals(6, queueB.getResourceUsage().getVirtualCores());
scheduler.update();
// Verify that Queue us not starved for fair share..
// Since the Starvation logic now uses DRF when the policy = drf, The
// Queue should not be starved
assertFalse(queueB.isStarvedForFairShare());
}
@Test @Test
public void testConcurrentAccess() { public void testConcurrentAccess() {
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");

View File

@ -100,7 +100,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Dom
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -1706,7 +1705,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
clock.tickSec(11); clock.tickSec(11);
scheduler.update(); scheduler.update();
Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager() Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
.getLeafQueue("queueA.queueA2", false), clock.getTime()); .getLeafQueue("queueA.queueA2", false), clock.getTime());
assertEquals(3277, toPreempt.getMemory()); assertEquals(3277, toPreempt.getMemory());
@ -1829,25 +1828,173 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.getQueueManager().getLeafQueue("queueD", true); scheduler.getQueueManager().getLeafQueue("queueD", true);
assertTrue(Resources.equals( assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(schedC, clock.getTime()))); Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
assertTrue(Resources.equals( assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(schedD, clock.getTime()))); Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
// After minSharePreemptionTime has passed, they should want to preempt min // After minSharePreemptionTime has passed, they should want to preempt min
// share. // share.
clock.tickSec(6); clock.tickSec(6);
assertEquals( assertEquals(
1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
assertEquals( assertEquals(
1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
// After fairSharePreemptionTime has passed, they should want to preempt // After fairSharePreemptionTime has passed, they should want to preempt
// fair share. // fair share.
scheduler.update(); scheduler.update();
clock.tickSec(6); clock.tickSec(6);
assertEquals( assertEquals(
1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); 1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
assertEquals( assertEquals(
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); 1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
}
@Test
/**
* Tests the timing of decision to preempt tasks.
*/
public void testPreemptionDecisionWithDRF() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
ControlledClock clock = new ControlledClock();
scheduler.setClock(clock);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"default\">");
out.println("<maxResources>0mb,0vcores</maxResources>");
out.println("</queue>");
out.println("<queue name=\"queueA\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024mb,1vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueB\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024mb,2vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueC\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024mb,3vcores</minResources>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
out.println("<weight>.25</weight>");
out.println("<minResources>1024mb,2vcores</minResources>");
out.println("</queue>");
out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Create four nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3,
"127.0.0.3");
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
// Queue A and B each request three containers
ApplicationAttemptId app1 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
ApplicationAttemptId app2 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
ApplicationAttemptId app3 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
ApplicationAttemptId app4 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
ApplicationAttemptId app5 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
ApplicationAttemptId app6 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
scheduler.update();
// Sufficient node check-ins to fully schedule containers
for (int i = 0; i < 2; i++) {
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(nodeUpdate1);
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(nodeUpdate2);
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
scheduler.handle(nodeUpdate3);
}
// Now new requests arrive from queues C and D
ApplicationAttemptId app7 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
ApplicationAttemptId app8 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
ApplicationAttemptId app9 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
ApplicationAttemptId app10 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1);
ApplicationAttemptId app11 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2);
ApplicationAttemptId app12 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3);
scheduler.update();
FSLeafQueue schedC =
scheduler.getQueueManager().getLeafQueue("queueC", true);
FSLeafQueue schedD =
scheduler.getQueueManager().getLeafQueue("queueD", true);
assertTrue(Resources.equals(
Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
assertTrue(Resources.equals(
Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
// Test :
// 1) whether componentWise min works as expected.
// 2) DRF calculator is used
// After minSharePreemptionTime has passed, they should want to preempt min
// share.
clock.tickSec(6);
Resource res = scheduler.resourceDeficit(schedC, clock.getTime());
assertEquals(1024, res.getMemory());
// Demand = 3
assertEquals(3, res.getVirtualCores());
res = scheduler.resourceDeficit(schedD, clock.getTime());
assertEquals(1024, res.getMemory());
// Demand = 6, but min share = 2
assertEquals(2, res.getVirtualCores());
// After fairSharePreemptionTime has passed, they should want to preempt
// fair share.
scheduler.update();
clock.tickSec(6);
res = scheduler.resourceDeficit(schedC, clock.getTime());
assertEquals(1536, res.getMemory());
assertEquals(3, res.getVirtualCores());
res = scheduler.resourceDeficit(schedD, clock.getTime());
assertEquals(1536, res.getMemory());
// Demand = 6, but fair share = 3
assertEquals(3, res.getVirtualCores());
} }
@Test @Test
@ -1964,71 +2111,71 @@ public class TestFairScheduler extends FairSchedulerTestBase {
FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true); FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
assertTrue(Resources.equals( assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueB1, clock.getTime()))); Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime())));
assertTrue(Resources.equals( assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueB2, clock.getTime()))); Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime())));
assertTrue(Resources.equals( assertTrue(Resources.equals(
Resources.none(), scheduler.resToPreempt(queueC, clock.getTime()))); Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime())));
// After 5 seconds, queueB1 wants to preempt min share // After 5 seconds, queueB1 wants to preempt min share
scheduler.update(); scheduler.update();
clock.tickSec(6); clock.tickSec(6);
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
0, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); 0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
assertEquals( assertEquals(
0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
// After 10 seconds, queueB2 wants to preempt min share // After 10 seconds, queueB2 wants to preempt min share
scheduler.update(); scheduler.update();
clock.tickSec(5); clock.tickSec(5);
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
assertEquals( assertEquals(
0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
// After 15 seconds, queueC wants to preempt min share // After 15 seconds, queueC wants to preempt min share
scheduler.update(); scheduler.update();
clock.tickSec(5); clock.tickSec(5);
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
// After 20 seconds, queueB2 should want to preempt fair share // After 20 seconds, queueB2 should want to preempt fair share
scheduler.update(); scheduler.update();
clock.tickSec(5); clock.tickSec(5);
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
// After 25 seconds, queueB1 should want to preempt fair share // After 25 seconds, queueB1 should want to preempt fair share
scheduler.update(); scheduler.update();
clock.tickSec(5); clock.tickSec(5);
assertEquals( assertEquals(
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
assertEquals( assertEquals(
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
// After 30 seconds, queueC should want to preempt fair share // After 30 seconds, queueC should want to preempt fair share
scheduler.update(); scheduler.update();
clock.tickSec(5); clock.tickSec(5);
assertEquals( assertEquals(
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
assertEquals( assertEquals(
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
assertEquals( assertEquals(
1536, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); 1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
} }
@Test @Test