diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index c03b0f28bcd..2ae02dab391 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -9,6 +9,8 @@ Release 2.9.0 - UNRELEASED YARN-1856. Added cgroups based memory monitoring for containers as another alternative to custom memory-monitoring. (Varun Vasudev via vinodkv) + YARN-4462. FairScheduler: Disallow preemption from a queue. (Tao Jie via kasha) + IMPROVEMENTS YARN-4072. ApplicationHistoryServer, WebAppProxyServer, NodeManager and diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index bf4eae83fad..180ae49c8ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -98,6 +98,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { // Reservation system configuration private ReservationQueueConfiguration globalReservationQueueConfig; + private final Set nonPreemptableQueues; + public AllocationConfiguration(Map minQueueResources, Map maxQueueResources, Map queueMaxApps, Map userMaxApps, @@ -114,7 +116,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { QueuePlacementPolicy placementPolicy, Map> configuredQueues, ReservationQueueConfiguration globalReservationQueueConfig, - Set reservableQueues) { + Set reservableQueues, + Set nonPreemptableQueues) { this.minQueueResources = minQueueResources; this.maxQueueResources = maxQueueResources; this.queueMaxApps = queueMaxApps; @@ -135,6 +138,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { this.globalReservationQueueConfig = globalReservationQueueConfig; this.placementPolicy = placementPolicy; this.configuredQueues = configuredQueues; + this.nonPreemptableQueues = nonPreemptableQueues; } public AllocationConfiguration(Configuration conf) { @@ -161,6 +165,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { } placementPolicy = QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); + nonPreemptableQueues = new HashSet(); } /** @@ -210,6 +215,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { -1f : fairSharePreemptionThreshold; } + public boolean isPreemptable(String queueName) { + return !nonPreemptableQueues.contains(queueName); + } + public ResourceWeights getQueueWeight(String queue) { ResourceWeights weight = queueWeights.get(queue); return (weight == null) ? ResourceWeights.NEUTRAL : weight; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index 9a31be36242..d6012affc7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -224,6 +224,7 @@ public class AllocationFileLoaderService extends AbstractService { Map> queueAcls = new HashMap>(); Set reservableQueues = new HashSet(); + Set nonPreemptableQueues = new HashSet(); int userMaxAppsDefault = Integer.MAX_VALUE; int queueMaxAppsDefault = Integer.MAX_VALUE; Resource queueMaxResourcesDefault = Resources.unbounded(); @@ -360,7 +361,7 @@ public class AllocationFileLoaderService extends AbstractService { queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, configuredQueues, - reservableQueues); + reservableQueues, nonPreemptableQueues); } // Load placement policy and pass it configured queues @@ -409,7 +410,7 @@ public class AllocationFileLoaderService extends AbstractService { defaultSchedPolicy, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, newPlacementPolicy, configuredQueues, globalReservationQueueConfig, - reservableQueues); + reservableQueues, nonPreemptableQueues); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; @@ -431,7 +432,8 @@ public class AllocationFileLoaderService extends AbstractService { Map fairSharePreemptionThresholds, Map> queueAcls, Map> configuredQueues, - Set reservableQueues) + Set reservableQueues, + Set nonPreemptableQueues) throws AllocationConfigurationException { String queueName = element.getAttribute("name").trim(); @@ -508,13 +510,19 @@ public class AllocationFileLoaderService extends AbstractService { isLeaf = false; reservableQueues.add(queueName); configuredQueues.get(FSQueueType.PARENT).add(queueName); + } else if ("allowPreemptionFrom".equals(field.getTagName())) { + String text = ((Text)field.getFirstChild()).getData().trim(); + if (!Boolean.parseBoolean(text)) { + nonPreemptableQueues.add(queueName); + } } else if ("queue".endsWith(field.getTagName()) || "pool".equals(field.getTagName())) { loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights, queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts, fairSharePreemptionThresholds, - queueAcls, configuredQueues, reservableQueues); + queueAcls, configuredQueues, reservableQueues, + nonPreemptableQueues); isLeaf = false; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index febe050fe1f..a028422220e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -260,6 +260,14 @@ public class FSParentQueue extends FSQueue { readLock.lock(); try { for (FSQueue queue : childQueues) { + // Skip selection for non-preemptable queue + if (!queue.isPreemptable()) { + if (LOG.isDebugEnabled()) { + LOG.debug("skipping from queue=" + getName() + + " because it's a non-preemptable queue"); + } + continue; + } if (candidateQueue == null || comparator.compare(queue, candidateQueue) > 0) { candidateQueue = queue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 713bdcad7c4..f82411daed5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -62,6 +62,7 @@ public abstract class FSQueue implements Queue, Schedulable { private long fairSharePreemptionTimeout = Long.MAX_VALUE; private long minSharePreemptionTimeout = Long.MAX_VALUE; private float fairSharePreemptionThreshold = 0.5f; + private boolean preemptable = true; public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) { this.name = name; @@ -235,6 +236,10 @@ public abstract class FSQueue implements Queue, Schedulable { this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; } + public boolean isPreemptable() { + return preemptable; + } + /** * Recomputes the shares for all child queues and applications based on this * queue's current share @@ -242,7 +247,8 @@ public abstract class FSQueue implements Queue, Schedulable { public abstract void recomputeShares(); /** - * Update the min/fair share preemption timeouts and threshold for this queue. + * Update the min/fair share preemption timeouts, threshold and preemption + * disabled flag for this queue. */ public void updatePreemptionVariables() { // For min share timeout @@ -263,6 +269,9 @@ public abstract class FSQueue implements Queue, Schedulable { if (fairSharePreemptionThreshold < 0 && parent != null) { fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold(); } + // For option whether allow preemption from this queue + preemptable = scheduler.getAllocationConfiguration() + .isPreemptable(getName()); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java index 5ff9422e1ce..689622f0d44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java @@ -81,6 +81,7 @@ public class FairSchedulerPage extends RmView { } ri._(STEADY_FAIR_SHARE + ":", qinfo.getSteadyFairShare().toString()); ri._(INSTANTANEOUS_FAIR_SHARE + ":", qinfo.getFairShare().toString()); + ri._("Preemptable:", qinfo.isPreemptable()); html._(InfoBlock.class); // clear the info contents so this queue's info doesn't accumulate into another queue's info diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java index ee37f184eda..e02df65638f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java @@ -65,6 +65,8 @@ public class FairSchedulerQueueInfo { private String queueName; private String schedulingPolicy; + private boolean preemptable; + private FairSchedulerQueueInfoList childQueues; public FairSchedulerQueueInfo() { @@ -108,6 +110,7 @@ public class FairSchedulerQueueInfo { return; } + preemptable = queue.isPreemptable(); childQueues = getChildQueues(queue, scheduler); } @@ -228,4 +231,8 @@ public class FairSchedulerQueueInfo { return childQueues != null ? childQueues.getQueueInfoList() : new ArrayList(); } + + public boolean isPreemptable() { + return preemptable; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 4a32bc6bb5d..8b5263c25b0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2492,6 +2492,333 @@ public class TestFairScheduler extends FairSchedulerTestBase { 1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory()); } + @Test + /** + * Tests the decision to preempt tasks respect to non-preemptable queues + * 1, Queues as follow: + * queueA(non-preemptable) + * queueB(preemptable) + * parentQueue(non-preemptable) + * --queueC(preemptable) + * queueD(preemptable) + * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare + * 3, Now all resource are occupied + * 4, Submit request to queueD, and need to preempt resource from other queues + * 5, Only preemptable queue(queueB) would be preempted. + */ + public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println("false"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println("false"); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println(""); + out.println(".25"); + out.println("2048mb,0vcores"); + out.println(""); + out.println("5"); + out.println("10"); + out.println(".5"); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes(3G each) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + RMNode node4 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4, + "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + // Submit apps to queueA, queueB, queueC, + // now all resource of the cluster is occupied + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3); + + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + + // Now new requests arrive from queues D + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1); + scheduler.update(); + FSLeafQueue schedD = + scheduler.getQueueManager().getLeafQueue("queueD", true); + + // After minSharePreemptionTime has passed, 2G resource should preempted from + // queueB to queueD + clock.tickSec(6); + assertEquals(2048, + scheduler.resourceDeficit(schedD, clock.getTime()).getMemory()); + + scheduler.preemptResources(Resources.createResource(2 * 1024)); + // now only app2 is selected to be preempted + assertTrue("App2 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App1 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app1).getLiveContainers(), + scheduler.getSchedulerApp(app1).getPreemptionContainers())); + assertTrue("App3 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app3).getLiveContainers(), + scheduler.getSchedulerApp(app3).getPreemptionContainers())); + // Pretend 20 seconds have passed + clock.tickSec(20); + scheduler.preemptResources(Resources.createResource(2 * 1024)); + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + // after preemption + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + } + + @Test + /** + * Tests the decision to preempt tasks when allowPreemptionFrom is set false on + * all queues. + * Then none of them would be preempted actually. + * 1, Queues as follow: + * queueA(non-preemptable) + * queueB(non-preemptable) + * parentQueue(non-preemptable) + * --queueC(preemptable) + * parentQueue(preemptable) + * --queueD(non-preemptable) + * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare + * 3, Now all resource are occupied + * 4, Submit request to queueA, and need to preempt resource from other queues + * 5, None of queues would be preempted. + */ + public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues() + throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + ControlledClock clock = new ControlledClock(); + scheduler.setClock(clock); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println("0mb,0vcores"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("2048mb,0vcores"); + out.println("false"); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println("false"); + out.println(""); + out.println(""); + out.println("false"); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println(".25"); + out.println("1024mb,0vcores"); + out.println("false"); + out.println(""); + out.println(""); + out.println("5"); + out.println("10"); + out.println(".5"); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Create four nodes(3G each) + RMNode node1 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1, + "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + RMNode node2 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2, + "127.0.0.2"); + NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); + scheduler.handle(nodeEvent2); + + RMNode node3 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3, + "127.0.0.3"); + NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); + scheduler.handle(nodeEvent3); + + RMNode node4 = + MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4, + "127.0.0.4"); + NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4); + scheduler.handle(nodeEvent4); + + // Submit apps to queueB, queueC, queueD + // now all resource of the cluster is occupied + + ApplicationAttemptId app1 = + createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1); + ApplicationAttemptId app2 = + createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3); + scheduler.update(); + + // Sufficient node check-ins to fully schedule containers + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + + // Now new requests arrive from queues A + ApplicationAttemptId app4 = + createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1); + scheduler.update(); + FSLeafQueue schedA = + scheduler.getQueueManager().getLeafQueue("queueA", true); + + // After minSharePreemptionTime has passed, resource deficit is 2G + clock.tickSec(6); + assertEquals(2048, + scheduler.resourceDeficit(schedA, clock.getTime()).getMemory()); + + scheduler.preemptResources(Resources.createResource(2 * 1024)); + // now none app is selected to be preempted + assertTrue("App1 should have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app1).getLiveContainers(), + scheduler.getSchedulerApp(app1).getPreemptionContainers())); + assertTrue("App2 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App3 should not have container to be preempted", + Collections.disjoint( + scheduler.getSchedulerApp(app3).getLiveContainers(), + scheduler.getSchedulerApp(app3).getPreemptionContainers())); + // Pretend 20 seconds have passed + clock.tickSec(20); + scheduler.preemptResources(Resources.createResource(2 * 1024)); + for (int i = 0; i < 3; i++) { + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(nodeUpdate1); + + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + scheduler.handle(nodeUpdate2); + + NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); + scheduler.handle(nodeUpdate3); + + NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4); + scheduler.handle(nodeUpdate4); + } + // after preemption + assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + } + @Test public void testBackwardsCompatiblePreemptionConfiguration() throws Exception { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);