YARN-3453. Ensure preemption logic in FairScheduler uses DominantResourceCalculator in DRF queues to prevent unnecessary thrashing. (asuresh)
This commit is contained in:
parent
a431ed9075
commit
ac94ba3e18
|
@ -628,6 +628,9 @@ Release 2.8.0 - UNRELEASED
|
|||
YARN-3381. Fix typo InvalidStateTransitonException.
|
||||
(Brahma Reddy Battula via aajisaka)
|
||||
|
||||
YARN-3453. Ensure preemption logic in FairScheduler uses DominantResourceCalculator
|
||||
in DRF queues to prevent unnecessary thrashing. (asuresh)
|
||||
|
||||
Release 2.7.2 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -560,9 +560,10 @@ public class FSLeafQueue extends FSQueue {
|
|||
}
|
||||
|
||||
private boolean isStarved(Resource share) {
|
||||
Resource desiredShare = Resources.min(scheduler.getResourceCalculator(),
|
||||
Resource desiredShare = Resources.min(policy.getResourceCalculator(),
|
||||
scheduler.getClusterResource(), share, getDemand());
|
||||
return Resources.lessThan(scheduler.getResourceCalculator(),
|
||||
scheduler.getClusterResource(), getResourceUsage(), desiredShare);
|
||||
Resource resourceUsage = getResourceUsage();
|
||||
return Resources.lessThan(policy.getResourceCalculator(),
|
||||
scheduler.getClusterResource(), resourceUsage, desiredShare);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -371,10 +371,9 @@ public class FairScheduler extends
|
|||
|
||||
Resource resToPreempt = Resources.clone(Resources.none());
|
||||
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
||||
Resources.addTo(resToPreempt, resToPreempt(sched, curTime));
|
||||
Resources.addTo(resToPreempt, resourceDeficit(sched, curTime));
|
||||
}
|
||||
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
|
||||
Resources.none())) {
|
||||
if (isResourceGreaterThanNone(resToPreempt)) {
|
||||
preemptResources(resToPreempt);
|
||||
}
|
||||
}
|
||||
|
@ -404,8 +403,7 @@ public class FairScheduler extends
|
|||
RMContainer container = warnedIter.next();
|
||||
if ((container.getState() == RMContainerState.RUNNING ||
|
||||
container.getState() == RMContainerState.ALLOCATED) &&
|
||||
Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
|
||||
toPreempt, Resources.none())) {
|
||||
isResourceGreaterThanNone(toPreempt)) {
|
||||
warnOrKillContainer(container);
|
||||
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
|
||||
} else {
|
||||
|
@ -419,8 +417,7 @@ public class FairScheduler extends
|
|||
queue.resetPreemptedResources();
|
||||
}
|
||||
|
||||
while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
|
||||
toPreempt, Resources.none())) {
|
||||
while (isResourceGreaterThanNone(toPreempt)) {
|
||||
RMContainer container =
|
||||
getQueueManager().getRootQueue().preemptContainer();
|
||||
if (container == null) {
|
||||
|
@ -443,6 +440,10 @@ public class FairScheduler extends
|
|||
fsOpDurations.addPreemptCallDuration(duration);
|
||||
}
|
||||
|
||||
private boolean isResourceGreaterThanNone(Resource toPreempt) {
|
||||
return (toPreempt.getMemory() > 0) || (toPreempt.getVirtualCores() > 0);
|
||||
}
|
||||
|
||||
protected void warnOrKillContainer(RMContainer container) {
|
||||
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
|
||||
FSAppAttempt app = getSchedulerApp(appAttemptId);
|
||||
|
@ -485,33 +486,34 @@ public class FairScheduler extends
|
|||
* max of the two amounts (this shouldn't happen unless someone sets the
|
||||
* timeouts to be identical for some reason).
|
||||
*/
|
||||
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
|
||||
protected Resource resourceDeficit(FSLeafQueue sched, long curTime) {
|
||||
long minShareTimeout = sched.getMinSharePreemptionTimeout();
|
||||
long fairShareTimeout = sched.getFairSharePreemptionTimeout();
|
||||
Resource resDueToMinShare = Resources.none();
|
||||
Resource resDueToFairShare = Resources.none();
|
||||
ResourceCalculator calc = sched.getPolicy().getResourceCalculator();
|
||||
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
|
||||
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
|
||||
Resource target = Resources.componentwiseMin(
|
||||
sched.getMinShare(), sched.getDemand());
|
||||
resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
|
||||
resDueToMinShare = Resources.max(calc, clusterResource,
|
||||
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
|
||||
}
|
||||
if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) {
|
||||
Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource,
|
||||
Resource target = Resources.componentwiseMin(
|
||||
sched.getFairShare(), sched.getDemand());
|
||||
resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource,
|
||||
resDueToFairShare = Resources.max(calc, clusterResource,
|
||||
Resources.none(), Resources.subtract(target, sched.getResourceUsage()));
|
||||
}
|
||||
Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource,
|
||||
Resource deficit = Resources.max(calc, clusterResource,
|
||||
resDueToMinShare, resDueToFairShare);
|
||||
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
|
||||
resToPreempt, Resources.none())) {
|
||||
String message = "Should preempt " + resToPreempt + " res for queue "
|
||||
if (Resources.greaterThan(calc, clusterResource,
|
||||
deficit, Resources.none())) {
|
||||
String message = "Should preempt " + deficit + " res for queue "
|
||||
+ sched.getName() + ": resDueToMinShare = " + resDueToMinShare
|
||||
+ ", resDueToFairShare = " + resDueToFairShare;
|
||||
LOG.info(message);
|
||||
}
|
||||
return resToPreempt;
|
||||
return deficit;
|
||||
}
|
||||
|
||||
public synchronized RMContainerTokenSecretManager
|
||||
|
|
|
@ -26,6 +26,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Dom
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
@ -97,6 +100,14 @@ public abstract class SchedulingPolicy {
|
|||
|
||||
public void initialize(Resource clusterCapacity) {}
|
||||
|
||||
/**
|
||||
* The {@link ResourceCalculator} returned by this method should be used
|
||||
* for any calculations involving resources.
|
||||
*
|
||||
* @return ResourceCalculator instance to use
|
||||
*/
|
||||
public abstract ResourceCalculator getResourceCalculator();
|
||||
|
||||
/**
|
||||
* @return returns the name of {@link SchedulingPolicy}
|
||||
*/
|
||||
|
|
|
@ -29,6 +29,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*;
|
||||
|
@ -44,8 +47,10 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|||
|
||||
public static final String NAME = "DRF";
|
||||
|
||||
private DominantResourceFairnessComparator comparator =
|
||||
private static final DominantResourceFairnessComparator COMPARATOR =
|
||||
new DominantResourceFairnessComparator();
|
||||
private static final DominantResourceCalculator CALCULATOR =
|
||||
new DominantResourceCalculator();
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
|
@ -59,7 +64,12 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|||
|
||||
@Override
|
||||
public Comparator<Schedulable> getComparator() {
|
||||
return comparator;
|
||||
return COMPARATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceCalculator getResourceCalculator() {
|
||||
return CALCULATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -105,7 +115,7 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
|
|||
|
||||
@Override
|
||||
public void initialize(Resource clusterCapacity) {
|
||||
comparator.setClusterCapacity(clusterCapacity);
|
||||
COMPARATOR.setClusterCapacity(clusterCapacity);
|
||||
}
|
||||
|
||||
public static class DominantResourceFairnessComparator implements Comparator<Schedulable> {
|
||||
|
|
|
@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -43,7 +44,8 @@ public class FairSharePolicy extends SchedulingPolicy {
|
|||
public static final String NAME = "fair";
|
||||
private static final DefaultResourceCalculator RESOURCE_CALCULATOR =
|
||||
new DefaultResourceCalculator();
|
||||
private FairShareComparator comparator = new FairShareComparator();
|
||||
private static final FairShareComparator COMPARATOR =
|
||||
new FairShareComparator();
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
|
@ -111,7 +113,12 @@ public class FairSharePolicy extends SchedulingPolicy {
|
|||
|
||||
@Override
|
||||
public Comparator<Schedulable> getComparator() {
|
||||
return comparator;
|
||||
return COMPARATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceCalculator getResourceCalculator() {
|
||||
return RESOURCE_CALCULATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -27,6 +27,10 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy;
|
||||
|
||||
|
||||
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -36,7 +40,9 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
public class FifoPolicy extends SchedulingPolicy {
|
||||
@VisibleForTesting
|
||||
public static final String NAME = "FIFO";
|
||||
private FifoComparator comparator = new FifoComparator();
|
||||
private static final FifoComparator COMPARATOR = new FifoComparator();
|
||||
private static final DefaultResourceCalculator CALCULATOR =
|
||||
new DefaultResourceCalculator();
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
|
@ -68,7 +74,12 @@ public class FifoPolicy extends SchedulingPolicy {
|
|||
|
||||
@Override
|
||||
public Comparator<Schedulable> getComparator() {
|
||||
return comparator;
|
||||
return COMPARATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResourceCalculator getResourceCalculator() {
|
||||
return CALCULATOR;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -233,6 +233,70 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
|
|||
assertFalse(queueB2.isStarvedForFairShare());
|
||||
}
|
||||
|
||||
@Test (timeout = 5000)
|
||||
public void testIsStarvedForFairShareDRF() throws Exception {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<weight>.5</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<weight>.5</weight>");
|
||||
out.println("</queue>");
|
||||
out.println("<defaultFairSharePreemptionThreshold>1</defaultFairSharePreemptionThreshold>");
|
||||
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
resourceManager = new MockRM(conf);
|
||||
resourceManager.start();
|
||||
scheduler = (FairScheduler) resourceManager.getResourceScheduler();
|
||||
|
||||
// Add one big node (only care about aggregate capacity)
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
scheduler.update();
|
||||
|
||||
// Queue A wants 7 * 1024, 1. Node update gives this all to A
|
||||
createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1);
|
||||
scheduler.update();
|
||||
NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
QueueManager queueMgr = scheduler.getQueueManager();
|
||||
FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false);
|
||||
assertEquals(7 * 1024, queueA.getResourceUsage().getMemory());
|
||||
assertEquals(1, queueA.getResourceUsage().getVirtualCores());
|
||||
|
||||
// Queue B has 3 reqs :
|
||||
// 1) 2 * 1024, 5 .. which will be granted
|
||||
// 2) 1 * 1024, 1 .. which will be granted
|
||||
// 3) 1 * 1024, 1 .. which wont
|
||||
createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1);
|
||||
createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2);
|
||||
scheduler.update();
|
||||
for (int i = 0; i < 3; i ++) {
|
||||
scheduler.handle(nodeEvent2);
|
||||
}
|
||||
|
||||
FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false);
|
||||
assertEquals(3 * 1024, queueB.getResourceUsage().getMemory());
|
||||
assertEquals(6, queueB.getResourceUsage().getVirtualCores());
|
||||
|
||||
scheduler.update();
|
||||
|
||||
// Verify that Queue us not starved for fair share..
|
||||
// Since the Starvation logic now uses DRF when the policy = drf, The
|
||||
// Queue should not be starved
|
||||
assertFalse(queueB.isStarvedForFairShare());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConcurrentAccess() {
|
||||
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
|
||||
|
|
|
@ -100,7 +100,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.Dom
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.ControlledClock;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
|
@ -1706,7 +1705,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
clock.tickSec(11);
|
||||
|
||||
scheduler.update();
|
||||
Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager()
|
||||
Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
|
||||
.getLeafQueue("queueA.queueA2", false), clock.getTime());
|
||||
assertEquals(3277, toPreempt.getMemory());
|
||||
|
||||
|
@ -1829,25 +1828,173 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
scheduler.getQueueManager().getLeafQueue("queueD", true);
|
||||
|
||||
assertTrue(Resources.equals(
|
||||
Resources.none(), scheduler.resToPreempt(schedC, clock.getTime())));
|
||||
Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
|
||||
assertTrue(Resources.equals(
|
||||
Resources.none(), scheduler.resToPreempt(schedD, clock.getTime())));
|
||||
Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
|
||||
// After minSharePreemptionTime has passed, they should want to preempt min
|
||||
// share.
|
||||
clock.tickSec(6);
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
|
||||
|
||||
// After fairSharePreemptionTime has passed, they should want to preempt
|
||||
// fair share.
|
||||
scheduler.update();
|
||||
clock.tickSec(6);
|
||||
assertEquals(
|
||||
1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory());
|
||||
1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory());
|
||||
1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
/**
|
||||
* Tests the timing of decision to preempt tasks.
|
||||
*/
|
||||
public void testPreemptionDecisionWithDRF() throws Exception {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
ControlledClock clock = new ControlledClock();
|
||||
scheduler.setClock(clock);
|
||||
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"default\">");
|
||||
out.println("<maxResources>0mb,0vcores</maxResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueA\">");
|
||||
out.println("<weight>.25</weight>");
|
||||
out.println("<minResources>1024mb,1vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueB\">");
|
||||
out.println("<weight>.25</weight>");
|
||||
out.println("<minResources>1024mb,2vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueC\">");
|
||||
out.println("<weight>.25</weight>");
|
||||
out.println("<minResources>1024mb,3vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queueD\">");
|
||||
out.println("<weight>.25</weight>");
|
||||
out.println("<minResources>1024mb,2vcores</minResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
||||
out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
|
||||
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
|
||||
out.println("<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// Create four nodes
|
||||
RMNode node1 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1,
|
||||
"127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
RMNode node2 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2,
|
||||
"127.0.0.2");
|
||||
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||
scheduler.handle(nodeEvent2);
|
||||
|
||||
RMNode node3 =
|
||||
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3,
|
||||
"127.0.0.3");
|
||||
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
||||
scheduler.handle(nodeEvent3);
|
||||
|
||||
// Queue A and B each request three containers
|
||||
ApplicationAttemptId app1 =
|
||||
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
|
||||
ApplicationAttemptId app2 =
|
||||
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
|
||||
ApplicationAttemptId app3 =
|
||||
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
|
||||
|
||||
ApplicationAttemptId app4 =
|
||||
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
|
||||
ApplicationAttemptId app5 =
|
||||
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
|
||||
ApplicationAttemptId app6 =
|
||||
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
|
||||
|
||||
scheduler.update();
|
||||
|
||||
// Sufficient node check-ins to fully schedule containers
|
||||
for (int i = 0; i < 2; i++) {
|
||||
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
||||
scheduler.handle(nodeUpdate1);
|
||||
|
||||
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
||||
scheduler.handle(nodeUpdate2);
|
||||
|
||||
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
||||
scheduler.handle(nodeUpdate3);
|
||||
}
|
||||
|
||||
// Now new requests arrive from queues C and D
|
||||
ApplicationAttemptId app7 =
|
||||
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
|
||||
ApplicationAttemptId app8 =
|
||||
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
|
||||
ApplicationAttemptId app9 =
|
||||
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
|
||||
|
||||
ApplicationAttemptId app10 =
|
||||
createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1);
|
||||
ApplicationAttemptId app11 =
|
||||
createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2);
|
||||
ApplicationAttemptId app12 =
|
||||
createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3);
|
||||
|
||||
scheduler.update();
|
||||
|
||||
FSLeafQueue schedC =
|
||||
scheduler.getQueueManager().getLeafQueue("queueC", true);
|
||||
FSLeafQueue schedD =
|
||||
scheduler.getQueueManager().getLeafQueue("queueD", true);
|
||||
|
||||
assertTrue(Resources.equals(
|
||||
Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
|
||||
assertTrue(Resources.equals(
|
||||
Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
|
||||
|
||||
// Test :
|
||||
// 1) whether componentWise min works as expected.
|
||||
// 2) DRF calculator is used
|
||||
|
||||
// After minSharePreemptionTime has passed, they should want to preempt min
|
||||
// share.
|
||||
clock.tickSec(6);
|
||||
Resource res = scheduler.resourceDeficit(schedC, clock.getTime());
|
||||
assertEquals(1024, res.getMemory());
|
||||
// Demand = 3
|
||||
assertEquals(3, res.getVirtualCores());
|
||||
|
||||
res = scheduler.resourceDeficit(schedD, clock.getTime());
|
||||
assertEquals(1024, res.getMemory());
|
||||
// Demand = 6, but min share = 2
|
||||
assertEquals(2, res.getVirtualCores());
|
||||
|
||||
// After fairSharePreemptionTime has passed, they should want to preempt
|
||||
// fair share.
|
||||
scheduler.update();
|
||||
clock.tickSec(6);
|
||||
res = scheduler.resourceDeficit(schedC, clock.getTime());
|
||||
assertEquals(1536, res.getMemory());
|
||||
assertEquals(3, res.getVirtualCores());
|
||||
|
||||
res = scheduler.resourceDeficit(schedD, clock.getTime());
|
||||
assertEquals(1536, res.getMemory());
|
||||
// Demand = 6, but fair share = 3
|
||||
assertEquals(3, res.getVirtualCores());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -1964,71 +2111,71 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
|
||||
|
||||
assertTrue(Resources.equals(
|
||||
Resources.none(), scheduler.resToPreempt(queueB1, clock.getTime())));
|
||||
Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime())));
|
||||
assertTrue(Resources.equals(
|
||||
Resources.none(), scheduler.resToPreempt(queueB2, clock.getTime())));
|
||||
Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime())));
|
||||
assertTrue(Resources.equals(
|
||||
Resources.none(), scheduler.resToPreempt(queueC, clock.getTime())));
|
||||
Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime())));
|
||||
|
||||
// After 5 seconds, queueB1 wants to preempt min share
|
||||
scheduler.update();
|
||||
clock.tickSec(6);
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
0, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
|
||||
0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
|
||||
0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
|
||||
|
||||
// After 10 seconds, queueB2 wants to preempt min share
|
||||
scheduler.update();
|
||||
clock.tickSec(5);
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
|
||||
0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
|
||||
|
||||
// After 15 seconds, queueC wants to preempt min share
|
||||
scheduler.update();
|
||||
clock.tickSec(5);
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
|
||||
|
||||
// After 20 seconds, queueB2 should want to preempt fair share
|
||||
scheduler.update();
|
||||
clock.tickSec(5);
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
|
||||
1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
|
||||
|
||||
// After 25 seconds, queueB1 should want to preempt fair share
|
||||
scheduler.update();
|
||||
clock.tickSec(5);
|
||||
assertEquals(
|
||||
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
||||
1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
|
||||
1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
|
||||
1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
|
||||
|
||||
// After 30 seconds, queueC should want to preempt fair share
|
||||
scheduler.update();
|
||||
clock.tickSec(5);
|
||||
assertEquals(
|
||||
1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory());
|
||||
1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory());
|
||||
1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
|
||||
assertEquals(
|
||||
1536, scheduler.resToPreempt(queueC, clock.getTime()).getMemory());
|
||||
1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue