From 5f58be7dd4b68862914fac912216ecb30a0f12ed Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Tue, 14 Jul 2015 00:23:55 -0700 Subject: [PATCH] YARN-3453. Ensure preemption logic in FairScheduler uses DominantResourceCalculator in DRF queues to prevent unnecessary thrashing. (asuresh) (cherry picked from commit ac94ba3e185115b83351e35c610c2b8ff91b1ebc) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FSLeafQueue.java | 9 +- .../scheduler/fair/FairScheduler.java | 40 ++-- .../scheduler/fair/SchedulingPolicy.java | 11 + .../DominantResourceFairnessPolicy.java | 18 +- .../fair/policies/FairSharePolicy.java | 11 +- .../scheduler/fair/policies/FifoPolicy.java | 15 +- .../scheduler/fair/TestFSLeafQueue.java | 64 ++++++ .../scheduler/fair/TestFairScheduler.java | 207 +++++++++++++++--- 9 files changed, 317 insertions(+), 61 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 090861b27ca..2f52af7fd6c 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -580,6 +580,9 @@ Release 2.8.0 - UNRELEASED YARN-3381. Fix typo InvalidStateTransitonException. (Brahma Reddy Battula via aajisaka) + YARN-3453. Ensure preemption logic in FairScheduler uses DominantResourceCalculator + in DRF queues to prevent unnecessary thrashing. (asuresh) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 6779a1b3777..f90a198cb4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -560,9 +560,10 @@ boolean isStarvedForFairShare() { } private boolean isStarved(Resource share) { - Resource desiredShare = Resources.min(scheduler.getResourceCalculator(), - scheduler.getClusterResource(), share, getDemand()); - return Resources.lessThan(scheduler.getResourceCalculator(), - scheduler.getClusterResource(), getResourceUsage(), desiredShare); + Resource desiredShare = Resources.min(policy.getResourceCalculator(), + scheduler.getClusterResource(), share, getDemand()); + Resource resourceUsage = getResourceUsage(); + return Resources.lessThan(policy.getResourceCalculator(), + scheduler.getClusterResource(), resourceUsage, desiredShare); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index cbc10e7c1b8..efe654455ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -371,10 +371,9 @@ protected synchronized void preemptTasksIfNecessary() { Resource resToPreempt = Resources.clone(Resources.none()); for (FSLeafQueue sched : queueMgr.getLeafQueues()) { - Resources.addTo(resToPreempt, resToPreempt(sched, curTime)); + Resources.addTo(resToPreempt, resourceDeficit(sched, curTime)); } - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, - Resources.none())) { + if (isResourceGreaterThanNone(resToPreempt)) { preemptResources(resToPreempt); } } @@ -404,8 +403,7 @@ protected void preemptResources(Resource toPreempt) { RMContainer container = warnedIter.next(); if ((container.getState() == RMContainerState.RUNNING || container.getState() == RMContainerState.ALLOCATED) && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, - toPreempt, Resources.none())) { + isResourceGreaterThanNone(toPreempt)) { warnOrKillContainer(container); Resources.subtractFrom(toPreempt, container.getContainer().getResource()); } else { @@ -419,8 +417,7 @@ protected void preemptResources(Resource toPreempt) { queue.resetPreemptedResources(); } - while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, - toPreempt, Resources.none())) { + while (isResourceGreaterThanNone(toPreempt)) { RMContainer container = getQueueManager().getRootQueue().preemptContainer(); if (container == null) { @@ -442,7 +439,11 @@ protected void preemptResources(Resource toPreempt) { long duration = getClock().getTime() - start; fsOpDurations.addPreemptCallDuration(duration); } - + + private boolean isResourceGreaterThanNone(Resource toPreempt) { + return (toPreempt.getMemory() > 0) || (toPreempt.getVirtualCores() > 0); + } + protected void warnOrKillContainer(RMContainer container) { ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); FSAppAttempt app = getSchedulerApp(appAttemptId); @@ -485,33 +486,34 @@ protected void warnOrKillContainer(RMContainer container) { * max of the two amounts (this shouldn't happen unless someone sets the * timeouts to be identical for some reason). */ - protected Resource resToPreempt(FSLeafQueue sched, long curTime) { + protected Resource resourceDeficit(FSLeafQueue sched, long curTime) { long minShareTimeout = sched.getMinSharePreemptionTimeout(); long fairShareTimeout = sched.getFairSharePreemptionTimeout(); Resource resDueToMinShare = Resources.none(); Resource resDueToFairShare = Resources.none(); + ResourceCalculator calc = sched.getPolicy().getResourceCalculator(); if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, + Resource target = Resources.componentwiseMin( sched.getMinShare(), sched.getDemand()); - resDueToMinShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, + resDueToMinShare = Resources.max(calc, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } if (curTime - sched.getLastTimeAtFairShareThreshold() > fairShareTimeout) { - Resource target = Resources.min(RESOURCE_CALCULATOR, clusterResource, - sched.getFairShare(), sched.getDemand()); - resDueToFairShare = Resources.max(RESOURCE_CALCULATOR, clusterResource, + Resource target = Resources.componentwiseMin( + sched.getFairShare(), sched.getDemand()); + resDueToFairShare = Resources.max(calc, clusterResource, Resources.none(), Resources.subtract(target, sched.getResourceUsage())); } - Resource resToPreempt = Resources.max(RESOURCE_CALCULATOR, clusterResource, + Resource deficit = Resources.max(calc, clusterResource, resDueToMinShare, resDueToFairShare); - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, - resToPreempt, Resources.none())) { - String message = "Should preempt " + resToPreempt + " res for queue " + if (Resources.greaterThan(calc, clusterResource, + deficit, Resources.none())) { + String message = "Should preempt " + deficit + " res for queue " + sched.getName() + ": resDueToMinShare = " + resDueToMinShare + ", resDueToFairShare = " + resDueToFairShare; LOG.info(message); } - return resToPreempt; + return deficit; } public synchronized RMContainerTokenSecretManager diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index abdc834d8f6..160ba4b5d82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -26,6 +26,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; + +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; + import java.util.Collection; import java.util.Comparator; import java.util.concurrent.ConcurrentHashMap; @@ -97,6 +100,14 @@ public static SchedulingPolicy parse(String policy) public void initialize(Resource clusterCapacity) {} + /** + * The {@link ResourceCalculator} returned by this method should be used + * for any calculations involving resources. + * + * @return ResourceCalculator instance to use + */ + public abstract ResourceCalculator getResourceCalculator(); + /** * @return returns the name of {@link SchedulingPolicy} */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 86d503ba065..45fbf982832 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -29,6 +29,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; + +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import static org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType.*; @@ -44,8 +47,10 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { public static final String NAME = "DRF"; - private DominantResourceFairnessComparator comparator = + private static final DominantResourceFairnessComparator COMPARATOR = new DominantResourceFairnessComparator(); + private static final DominantResourceCalculator CALCULATOR = + new DominantResourceCalculator(); @Override public String getName() { @@ -59,9 +64,14 @@ public byte getApplicableDepth() { @Override public Comparator getComparator() { - return comparator; + return COMPARATOR; } - + + @Override + public ResourceCalculator getResourceCalculator() { + return CALCULATOR; + } + @Override public void computeShares(Collection schedulables, Resource totalResources) { @@ -105,7 +115,7 @@ public Resource getHeadroom(Resource queueFairShare, Resource queueUsage, @Override public void initialize(Resource clusterCapacity) { - comparator.setClusterCapacity(clusterCapacity); + COMPARATOR.setClusterCapacity(clusterCapacity); } public static class DominantResourceFairnessComparator implements Comparator { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 918db9d82d6..3b9f07fb864 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -43,7 +44,8 @@ public class FairSharePolicy extends SchedulingPolicy { public static final String NAME = "fair"; private static final DefaultResourceCalculator RESOURCE_CALCULATOR = new DefaultResourceCalculator(); - private FairShareComparator comparator = new FairShareComparator(); + private static final FairShareComparator COMPARATOR = + new FairShareComparator(); @Override public String getName() { @@ -111,7 +113,12 @@ else if (s1Needy && s2Needy) @Override public Comparator getComparator() { - return comparator; + return COMPARATOR; + } + + @Override + public ResourceCalculator getResourceCalculator() { + return RESOURCE_CALCULATOR; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 7d889339b1d..a644e584ed4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -27,6 +27,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; + + +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -36,7 +40,9 @@ public class FifoPolicy extends SchedulingPolicy { @VisibleForTesting public static final String NAME = "FIFO"; - private FifoComparator comparator = new FifoComparator(); + private static final FifoComparator COMPARATOR = new FifoComparator(); + private static final DefaultResourceCalculator CALCULATOR = + new DefaultResourceCalculator(); @Override public String getName() { @@ -68,7 +74,12 @@ public int compare(Schedulable s1, Schedulable s2) { @Override public Comparator getComparator() { - return comparator; + return COMPARATOR; + } + + @Override + public ResourceCalculator getResourceCalculator() { + return CALCULATOR; } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java index 385ea0be76b..76374102ebf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java @@ -233,6 +233,70 @@ public void testIsStarvedForFairShare() throws Exception { assertFalse(queueB2.isStarvedForFairShare()); } + @Test (timeout = 5000) + public void testIsStarvedForFairShareDRF() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(".5"); + out.println(""); + out.println(""); + out.println(".5"); + out.println(""); + out.println("1"); + out.println("drf"); + out.println(""); + out.close(); + + resourceManager = new MockRM(conf); + resourceManager.start(); + scheduler = (FairScheduler) resourceManager.getResourceScheduler(); + + // Add one big node (only care about aggregate capacity) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(10 * 1024, 10), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + scheduler.update(); + + // Queue A wants 7 * 1024, 1. Node update gives this all to A + createSchedulingRequest(7 * 1024, 1, "queueA", "user1", 1); + scheduler.update(); + NodeUpdateSchedulerEvent nodeEvent2 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeEvent2); + + QueueManager queueMgr = scheduler.getQueueManager(); + FSLeafQueue queueA = queueMgr.getLeafQueue("queueA", false); + assertEquals(7 * 1024, queueA.getResourceUsage().getMemory()); + assertEquals(1, queueA.getResourceUsage().getVirtualCores()); + + // Queue B has 3 reqs : + // 1) 2 * 1024, 5 .. which will be granted + // 2) 1 * 1024, 1 .. which will be granted + // 3) 1 * 1024, 1 .. which wont + createSchedulingRequest(2 * 1024, 5, "queueB", "user1", 1); + createSchedulingRequest(1 * 1024, 2, "queueB", "user1", 2); + scheduler.update(); + for (int i = 0; i < 3; i ++) { + scheduler.handle(nodeEvent2); + } + + FSLeafQueue queueB = queueMgr.getLeafQueue("queueB", false); + assertEquals(3 * 1024, queueB.getResourceUsage().getMemory()); + assertEquals(6, queueB.getResourceUsage().getVirtualCores()); + + scheduler.update(); + + // Verify that Queue us not starved for fair share.. + // Since the Starvation logic now uses DRF when the policy = drf, The + // Queue should not be starved + assertFalse(queueB.isStarvedForFairShare()); + } + @Test public void testConcurrentAccess() { conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 8e78322c6cd..ed79a0156d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -100,7 +100,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.ControlledClock; -import org.apache.hadoop.yarn.util.SystemClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.After; import org.junit.Assert; @@ -1706,8 +1705,8 @@ public void testPreemptionIsNotDelayedToNextRound() throws Exception { clock.tickSec(11); scheduler.update(); - Resource toPreempt = scheduler.resToPreempt(scheduler.getQueueManager() - .getLeafQueue("queueA.queueA2", false), clock.getTime()); + Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager() + .getLeafQueue("queueA.queueA2", false), clock.getTime()); assertEquals(3277, toPreempt.getMemory()); // verify if the 3 containers required by queueA2 are preempted in the same @@ -1829,25 +1828,173 @@ public void testPreemptionDecision() throws Exception { scheduler.getQueueManager().getLeafQueue("queueD", true); assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(schedC, clock.getTime()))); + Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime()))); assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(schedD, clock.getTime()))); + Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime()))); // After minSharePreemptionTime has passed, they should want to preempt min // share. clock.tickSec(6); assertEquals( - 1024, scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemory()); assertEquals( - 1024, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); // After fairSharePreemptionTime has passed, they should want to preempt // fair share. scheduler.update(); clock.tickSec(6); assertEquals( - 1536 , scheduler.resToPreempt(schedC, clock.getTime()).getMemory()); + 1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemory()); assertEquals( - 1536, scheduler.resToPreempt(schedD, clock.getTime()).getMemory()); + 1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); + } + + @Test +/** + * Tests the timing of decision to preempt tasks. + */ + public void testPreemptionDecisionWithDRF() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,1vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,2vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,3vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,2vcores"); + out.println(""); + out.println("5"); + out.println("10"); + out.println(".5"); + out.println("drf"); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + // Queue A and B each request three containers + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); + + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); + ApplicationAttemptId app5 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); + ApplicationAttemptId app6 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 2; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + } + + // Now new requests arrive from queues C and D + ApplicationAttemptId app7 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); + ApplicationAttemptId app8 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); + ApplicationAttemptId app9 = + createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); + + ApplicationAttemptId app10 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1); + ApplicationAttemptId app11 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2); + ApplicationAttemptId app12 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3); + + scheduler.update(); + + FSLeafQueue schedC = + scheduler.getQueueManager().getLeafQueue("queueC", true); + FSLeafQueue schedD = + scheduler.getQueueManager().getLeafQueue("queueD", true); + + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime()))); + assertTrue(Resources.equals( + Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime()))); + + // Test : + // 1) whether componentWise min works as expected. + // 2) DRF calculator is used + + // After minSharePreemptionTime has passed, they should want to preempt min + // share. + clock.tickSec(6); + Resource res = scheduler.resourceDeficit(schedC, clock.getTime()); + assertEquals(1024, res.getMemory()); + // Demand = 3 + assertEquals(3, res.getVirtualCores()); + + res = scheduler.resourceDeficit(schedD, clock.getTime()); + assertEquals(1024, res.getMemory()); + // Demand = 6, but min share = 2 + assertEquals(2, res.getVirtualCores()); + + // After fairSharePreemptionTime has passed, they should want to preempt + // fair share. + scheduler.update(); + clock.tickSec(6); + res = scheduler.resourceDeficit(schedC, clock.getTime()); + assertEquals(1536, res.getMemory()); + assertEquals(3, res.getVirtualCores()); + + res = scheduler.resourceDeficit(schedD, clock.getTime()); + assertEquals(1536, res.getMemory()); + // Demand = 6, but fair share = 3 + assertEquals(3, res.getVirtualCores()); } @Test @@ -1964,71 +2111,71 @@ public void testPreemptionDecisionWithVariousTimeout() throws Exception { FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true); assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(queueB1, clock.getTime()))); + Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime()))); assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(queueB2, clock.getTime()))); + Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime()))); assertTrue(Resources.equals( - Resources.none(), scheduler.resToPreempt(queueC, clock.getTime()))); + Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime()))); // After 5 seconds, queueB1 wants to preempt min share scheduler.update(); clock.tickSec(6); assertEquals( - 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); assertEquals( - 0, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); assertEquals( - 0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); // After 10 seconds, queueB2 wants to preempt min share scheduler.update(); clock.tickSec(5); assertEquals( - 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); assertEquals( - 1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); assertEquals( - 0, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); // After 15 seconds, queueC wants to preempt min share scheduler.update(); clock.tickSec(5); assertEquals( - 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); assertEquals( - 1024, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); assertEquals( - 1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); // After 20 seconds, queueB2 should want to preempt fair share scheduler.update(); clock.tickSec(5); assertEquals( - 1024, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); assertEquals( - 1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); assertEquals( - 1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); // After 25 seconds, queueB1 should want to preempt fair share scheduler.update(); clock.tickSec(5); assertEquals( - 1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); assertEquals( - 1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); assertEquals( - 1024, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); // After 30 seconds, queueC should want to preempt fair share scheduler.update(); clock.tickSec(5); assertEquals( - 1536, scheduler.resToPreempt(queueB1, clock.getTime()).getMemory()); + 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory()); assertEquals( - 1536, scheduler.resToPreempt(queueB2, clock.getTime()).getMemory()); + 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory()); assertEquals( - 1536, scheduler.resToPreempt(queueC, clock.getTime()).getMemory()); + 1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); } @Test