diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 37deb3c6919..14ecd2b46cc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -79,6 +79,9 @@ Release 2.9.0 - UNRELEASED
YARN-4708. Missing default mapper type in TimelineServer performance test tool
usage. (Kai Sasaki via ozawa)
+ YARN-4648. Move preemption related tests from TestFairScheduler to
+ TestFairSchedulerPreemption. (Kai Sasaki via ozawa)
+
OPTIMIZATIONS
BUG FIXES
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 8b5263c25b0..a7d27538963 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -57,7 +57,6 @@ import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
-import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -94,11 +93,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSch
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
@@ -1793,1122 +1790,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals(0.5f, root.getFairSharePreemptionThreshold(), 0.01);
}
- @Test (timeout = 5000)
- /**
- * Make sure containers are chosen to be preempted in the correct order.
- */
- public void testChoiceOfPreemptedContainers() throws Exception {
- conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
- conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
-
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("");
- out.println("");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Create two nodes
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
-
- // Queue A and B each request two applications
- ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
- createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
- ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
- createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
-
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
- createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
- ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
- createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
-
- scheduler.update();
-
- scheduler.getQueueManager().getLeafQueue("queueA", true)
- .setPolicy(SchedulingPolicy.parse("fifo"));
- scheduler.getQueueManager().getLeafQueue("queueB", true)
- .setPolicy(SchedulingPolicy.parse("fair"));
-
- // Sufficient node check-ins to fully schedule containers
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- for (int i = 0; i < 4; i++) {
- scheduler.handle(nodeUpdate1);
- scheduler.handle(nodeUpdate2);
- }
-
- assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-
- // Now new requests arrive from queueC and default
- createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
- scheduler.update();
-
- // We should be able to claw back one container from queueA and queueB each.
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-
- // First verify we are adding containers to preemption list for the app.
- // For queueA (fifo), app2 is selected.
- // For queueB (fair), app4 is selected.
- assertTrue("App2 should have container to be preempted",
- !Collections.disjoint(
- scheduler.getSchedulerApp(app2).getLiveContainers(),
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
- assertTrue("App4 should have container to be preempted",
- !Collections.disjoint(
- scheduler.getSchedulerApp(app2).getLiveContainers(),
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
-
- // Pretend 15 seconds have passed
- clock.tickSec(15);
-
- // Trigger a kill by insisting we want containers back
- scheduler.preemptResources(Resources.createResource(2 * 1024));
-
- // At this point the containers should have been killed (since we are not simulating AM)
- assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
- // Inside each app, containers are sorted according to their priorities.
- // Containers with priority 4 are preempted for app2 and app4.
- Set set = new HashSet();
- for (RMContainer container :
- scheduler.getSchedulerApp(app2).getLiveContainers()) {
- if (container.getAllocatedPriority().getPriority() == 4) {
- set.add(container);
- }
- }
- for (RMContainer container :
- scheduler.getSchedulerApp(app4).getLiveContainers()) {
- if (container.getAllocatedPriority().getPriority() == 4) {
- set.add(container);
- }
- }
- assertTrue("Containers with priority=4 in app2 and app4 should be " +
- "preempted.", set.isEmpty());
-
- // Trigger a kill by insisting we want containers back
- scheduler.preemptResources(Resources.createResource(2 * 1024));
-
- // Pretend 15 seconds have passed
- clock.tickSec(15);
-
- // We should be able to claw back another container from A and B each.
- // For queueA (fifo), continue preempting from app2.
- // For queueB (fair), even app4 has a lowest priority container with p=4, it
- // still preempts from app3 as app3 is most over fair share.
- scheduler.preemptResources(Resources.createResource(2 * 1024));
-
- assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
-
- // Now A and B are below fair share, so preemption shouldn't do anything
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- assertTrue("App1 should have no container to be preempted",
- scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
- assertTrue("App2 should have no container to be preempted",
- scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
- assertTrue("App3 should have no container to be preempted",
- scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
- assertTrue("App4 should have no container to be preempted",
- scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
- }
-
- @Test
- public void testPreemptionIsNotDelayedToNextRound() throws Exception {
- conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
-
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("");
- out.println("");
- out.println("");
- out.println("8");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("2");
- out.println("");
- out.println("10");
- out.println(".5");
- out.println("");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Add a node of 8G
- RMNode node1 = MockNodes.newNodeInfo(1,
- Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- // Run apps in queueA.A1 and queueB
- ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
- "queueA.queueA1", "user1", 7, 1);
- // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
- ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
- "user2", 1, 1);
-
- scheduler.update();
-
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- for (int i = 0; i < 8; i++) {
- scheduler.handle(nodeUpdate1);
- }
-
- // verify if the apps got the containers they requested
- assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
-
- // Now submit an app in queueA.queueA2
- ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
- "queueA.queueA2", "user3", 7, 1);
- scheduler.update();
-
- // Let 11 sec pass
- clock.tickSec(11);
-
- scheduler.update();
- Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
- .getLeafQueue("queueA.queueA2", false), clock.getTime());
- assertEquals(3277, toPreempt.getMemory());
-
- // verify if the 3 containers required by queueA2 are preempted in the same
- // round
- scheduler.preemptResources(toPreempt);
- assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
- .size());
- }
-
- @Test (timeout = 5000)
- /**
- * Tests the timing of decision to preempt tasks.
- */
- public void testPreemptionDecision() throws Exception {
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("");
- out.println("");
- out.println("");
- out.println("0mb,0vcores");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,0vcores");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,0vcores");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,0vcores");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,0vcores");
- out.println("");
- out.println("5");
- out.println("10");
- out.println(".5");
- out.println("");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Create four nodes
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
-
- RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
- "127.0.0.3");
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
- scheduler.handle(nodeEvent3);
-
- // Queue A and B each request three containers
- ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
- ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
-
- ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
- ApplicationAttemptId app5 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
- ApplicationAttemptId app6 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
-
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 2; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
- }
-
- // Now new requests arrive from queues C and D
- ApplicationAttemptId app7 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
- ApplicationAttemptId app8 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
- ApplicationAttemptId app9 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
- ApplicationAttemptId app10 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
- ApplicationAttemptId app11 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
- ApplicationAttemptId app12 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
-
- scheduler.update();
-
- FSLeafQueue schedC =
- scheduler.getQueueManager().getLeafQueue("queueC", true);
- FSLeafQueue schedD =
- scheduler.getQueueManager().getLeafQueue("queueD", true);
-
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
- // After minSharePreemptionTime has passed, they should want to preempt min
- // share.
- clock.tickSec(6);
- assertEquals(
- 1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
- assertEquals(
- 1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
-
- // After fairSharePreemptionTime has passed, they should want to preempt
- // fair share.
- scheduler.update();
- clock.tickSec(6);
- assertEquals(
- 1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
- assertEquals(
- 1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
- }
-
- @Test
-/**
- * Tests the timing of decision to preempt tasks.
- */
- public void testPreemptionDecisionWithDRF() throws Exception {
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("");
- out.println("");
- out.println("");
- out.println("0mb,0vcores");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,1vcores");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,2vcores");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,3vcores");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,2vcores");
- out.println("");
- out.println("5");
- out.println("10");
- out.println(".5");
- out.println("drf");
- out.println("");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Create four nodes
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
-
- RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3,
- "127.0.0.3");
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
- scheduler.handle(nodeEvent3);
-
- // Queue A and B each request three containers
- ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
- ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
-
- ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
- ApplicationAttemptId app5 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
- ApplicationAttemptId app6 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
-
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 2; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
- }
-
- // Now new requests arrive from queues C and D
- ApplicationAttemptId app7 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
- ApplicationAttemptId app8 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
- ApplicationAttemptId app9 =
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
- ApplicationAttemptId app10 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1);
- ApplicationAttemptId app11 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2);
- ApplicationAttemptId app12 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3);
-
- scheduler.update();
-
- FSLeafQueue schedC =
- scheduler.getQueueManager().getLeafQueue("queueC", true);
- FSLeafQueue schedD =
- scheduler.getQueueManager().getLeafQueue("queueD", true);
-
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
-
- // Test :
- // 1) whether componentWise min works as expected.
- // 2) DRF calculator is used
-
- // After minSharePreemptionTime has passed, they should want to preempt min
- // share.
- clock.tickSec(6);
- Resource res = scheduler.resourceDeficit(schedC, clock.getTime());
- assertEquals(1024, res.getMemory());
- // Demand = 3
- assertEquals(3, res.getVirtualCores());
-
- res = scheduler.resourceDeficit(schedD, clock.getTime());
- assertEquals(1024, res.getMemory());
- // Demand = 6, but min share = 2
- assertEquals(2, res.getVirtualCores());
-
- // After fairSharePreemptionTime has passed, they should want to preempt
- // fair share.
- scheduler.update();
- clock.tickSec(6);
- res = scheduler.resourceDeficit(schedC, clock.getTime());
- assertEquals(1536, res.getMemory());
- assertEquals(3, res.getVirtualCores());
-
- res = scheduler.resourceDeficit(schedD, clock.getTime());
- assertEquals(1536, res.getMemory());
- // Demand = 6, but fair share = 3
- assertEquals(3, res.getVirtualCores());
- }
-
- @Test
- /**
- * Tests the various timing of decision to preempt tasks.
- */
- public void testPreemptionDecisionWithVariousTimeout() throws Exception {
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("");
- out.println("");
- out.println("");
- out.println("0mb,0vcores");
- out.println("");
- out.println("");
- out.println("1");
- out.println("1024mb,0vcores");
- out.println("");
- out.println("");
- out.println("2");
- out.println("10");
- out.println("25");
- out.println("");
- out.println("1024mb,0vcores");
- out.println("5");
- out.println("");
- out.println("");
- out.println("1024mb,0vcores");
- out.println("20");
- out.println("");
- out.println("");
- out.println("");
- out.println("1");
- out.println("1024mb,0vcores");
- out.println("");
- out.print("15");
- out.print("30");
- out.println("");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Check the min/fair share preemption timeout for each queue
- QueueManager queueMgr = scheduler.getQueueManager();
- assertEquals(30000, queueMgr.getQueue("root")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("default")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueA")
- .getFairSharePreemptionTimeout());
- assertEquals(25000, queueMgr.getQueue("queueB")
- .getFairSharePreemptionTimeout());
- assertEquals(25000, queueMgr.getQueue("queueB.queueB1")
- .getFairSharePreemptionTimeout());
- assertEquals(20000, queueMgr.getQueue("queueB.queueB2")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueC")
- .getFairSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("root")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("default")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueA")
- .getMinSharePreemptionTimeout());
- assertEquals(10000, queueMgr.getQueue("queueB")
- .getMinSharePreemptionTimeout());
- assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
- .getMinSharePreemptionTimeout());
- assertEquals(10000, queueMgr.getQueue("queueB.queueB2")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueC")
- .getMinSharePreemptionTimeout());
-
- // Create one big node
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- // Queue A takes all resources
- for (int i = 0; i < 6; i ++) {
- createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
- }
-
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- for (int i = 0; i < 6; i++) {
- scheduler.handle(nodeUpdate1);
- }
-
- // Now new requests arrive from queues B1, B2 and C
- createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2);
- createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3);
- createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2);
- createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3);
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
- createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
-
- scheduler.update();
-
- FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true);
- FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true);
- FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
-
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime())));
- assertTrue(Resources.equals(
- Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime())));
-
- // After 5 seconds, queueB1 wants to preempt min share
- scheduler.update();
- clock.tickSec(6);
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
- assertEquals(
- 0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
- assertEquals(
- 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
-
- // After 10 seconds, queueB2 wants to preempt min share
- scheduler.update();
- clock.tickSec(5);
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
- assertEquals(
- 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
-
- // After 15 seconds, queueC wants to preempt min share
- scheduler.update();
- clock.tickSec(5);
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
- assertEquals(
- 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
-
- // After 20 seconds, queueB2 should want to preempt fair share
- scheduler.update();
- clock.tickSec(5);
- assertEquals(
- 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
- assertEquals(
- 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
- assertEquals(
- 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
-
- // After 25 seconds, queueB1 should want to preempt fair share
- scheduler.update();
- clock.tickSec(5);
- assertEquals(
- 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
- assertEquals(
- 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
- assertEquals(
- 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
-
- // After 30 seconds, queueC should want to preempt fair share
- scheduler.update();
- clock.tickSec(5);
- assertEquals(
- 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
- assertEquals(
- 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
- assertEquals(
- 1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
- }
-
- @Test
- /**
- * Tests the decision to preempt tasks respect to non-preemptable queues
- * 1, Queues as follow:
- * queueA(non-preemptable)
- * queueB(preemptable)
- * parentQueue(non-preemptable)
- * --queueC(preemptable)
- * queueD(preemptable)
- * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare
- * 3, Now all resource are occupied
- * 4, Submit request to queueD, and need to preempt resource from other queues
- * 5, Only preemptable queue(queueB) would be preempted.
- */
- public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception {
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("");
- out.println("");
- out.println("");
- out.println("0mb,0vcores");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,0vcores");
- out.println("false");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,0vcores");
- out.println("");
- out.println("");
- out.println("false");
- out.println("");
- out.println(".25");
- out.println("1024mb,0vcores");
- out.println("");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("2048mb,0vcores");
- out.println("");
- out.println("5");
- out.println("10");
- out.println(".5");
- out.println("");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Create four nodes(3G each)
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
-
- RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
- "127.0.0.3");
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
- scheduler.handle(nodeEvent3);
-
- RMNode node4 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
- "127.0.0.4");
- NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
- scheduler.handle(nodeEvent4);
-
- // Submit apps to queueA, queueB, queueC,
- // now all resource of the cluster is occupied
- ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
- ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2);
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3);
-
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 3; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
-
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
- scheduler.handle(nodeUpdate4);
- }
-
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-
- // Now new requests arrive from queues D
- ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1);
- scheduler.update();
- FSLeafQueue schedD =
- scheduler.getQueueManager().getLeafQueue("queueD", true);
-
- // After minSharePreemptionTime has passed, 2G resource should preempted from
- // queueB to queueD
- clock.tickSec(6);
- assertEquals(2048,
- scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
-
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- // now only app2 is selected to be preempted
- assertTrue("App2 should have container to be preempted",
- !Collections.disjoint(
- scheduler.getSchedulerApp(app2).getLiveContainers(),
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
- assertTrue("App1 should not have container to be preempted",
- Collections.disjoint(
- scheduler.getSchedulerApp(app1).getLiveContainers(),
- scheduler.getSchedulerApp(app1).getPreemptionContainers()));
- assertTrue("App3 should not have container to be preempted",
- Collections.disjoint(
- scheduler.getSchedulerApp(app3).getLiveContainers(),
- scheduler.getSchedulerApp(app3).getPreemptionContainers()));
- // Pretend 20 seconds have passed
- clock.tickSec(20);
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- for (int i = 0; i < 3; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
-
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
- scheduler.handle(nodeUpdate4);
- }
- // after preemption
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
- }
-
- @Test
- /**
- * Tests the decision to preempt tasks when allowPreemptionFrom is set false on
- * all queues.
- * Then none of them would be preempted actually.
- * 1, Queues as follow:
- * queueA(non-preemptable)
- * queueB(non-preemptable)
- * parentQueue(non-preemptable)
- * --queueC(preemptable)
- * parentQueue(preemptable)
- * --queueD(non-preemptable)
- * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare
- * 3, Now all resource are occupied
- * 4, Submit request to queueA, and need to preempt resource from other queues
- * 5, None of queues would be preempted.
- */
- public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
- throws Exception {
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("");
- out.println("");
- out.println("");
- out.println("0mb,0vcores");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("2048mb,0vcores");
- out.println("false");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,0vcores");
- out.println("false");
- out.println("");
- out.println("");
- out.println("false");
- out.println("");
- out.println(".25");
- out.println("1024mb,0vcores");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println(".25");
- out.println("1024mb,0vcores");
- out.println("false");
- out.println("");
- out.println("");
- out.println("5");
- out.println("10");
- out.println(".5");
- out.println("");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Create four nodes(3G each)
- RMNode node1 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
- "127.0.0.1");
- NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
- scheduler.handle(nodeEvent1);
-
- RMNode node2 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
- "127.0.0.2");
- NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
- scheduler.handle(nodeEvent2);
-
- RMNode node3 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
- "127.0.0.3");
- NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
- scheduler.handle(nodeEvent3);
-
- RMNode node4 =
- MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
- "127.0.0.4");
- NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
- scheduler.handle(nodeEvent4);
-
- // Submit apps to queueB, queueC, queueD
- // now all resource of the cluster is occupied
-
- ApplicationAttemptId app1 =
- createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1);
- ApplicationAttemptId app2 =
- createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2);
- ApplicationAttemptId app3 =
- createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3);
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- for (int i = 0; i < 3; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
-
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
- scheduler.handle(nodeUpdate4);
- }
-
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
-
- // Now new requests arrive from queues A
- ApplicationAttemptId app4 =
- createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
- scheduler.update();
- FSLeafQueue schedA =
- scheduler.getQueueManager().getLeafQueue("queueA", true);
-
- // After minSharePreemptionTime has passed, resource deficit is 2G
- clock.tickSec(6);
- assertEquals(2048,
- scheduler.resourceDeficit(schedA, clock.getTime()).getMemory());
-
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- // now none app is selected to be preempted
- assertTrue("App1 should have container to be preempted",
- Collections.disjoint(
- scheduler.getSchedulerApp(app1).getLiveContainers(),
- scheduler.getSchedulerApp(app1).getPreemptionContainers()));
- assertTrue("App2 should not have container to be preempted",
- Collections.disjoint(
- scheduler.getSchedulerApp(app2).getLiveContainers(),
- scheduler.getSchedulerApp(app2).getPreemptionContainers()));
- assertTrue("App3 should not have container to be preempted",
- Collections.disjoint(
- scheduler.getSchedulerApp(app3).getLiveContainers(),
- scheduler.getSchedulerApp(app3).getPreemptionContainers()));
- // Pretend 20 seconds have passed
- clock.tickSec(20);
- scheduler.preemptResources(Resources.createResource(2 * 1024));
- for (int i = 0; i < 3; i++) {
- NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
- scheduler.handle(nodeUpdate1);
-
- NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
- scheduler.handle(nodeUpdate2);
-
- NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
- scheduler.handle(nodeUpdate3);
-
- NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
- scheduler.handle(nodeUpdate4);
- }
- // after preemption
- assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
- assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
- assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size());
- }
-
- @Test
- public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
- conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
-
- PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("5");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.print("15");
- out.print("30");
- out.print("40");
- out.println("");
- out.close();
-
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- // Check the min/fair share preemption timeout for each queue
- QueueManager queueMgr = scheduler.getQueueManager();
- assertEquals(30000, queueMgr.getQueue("root")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("default")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueA")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueB")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
- .getFairSharePreemptionTimeout());
- assertEquals(30000, queueMgr.getQueue("queueC")
- .getFairSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("root")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("default")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueA")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueB")
- .getMinSharePreemptionTimeout());
- assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
- .getMinSharePreemptionTimeout());
- assertEquals(15000, queueMgr.getQueue("queueC")
- .getMinSharePreemptionTimeout());
-
- // If both exist, we take the default one
- out = new PrintWriter(new FileWriter(ALLOC_FILE));
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("5");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.println("");
- out.print("15");
- out.print("25");
- out.print("30");
- out.println("");
- out.close();
-
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- assertEquals(25000, queueMgr.getQueue("root")
- .getFairSharePreemptionTimeout());
- }
-
@Test(timeout = 5000)
public void testMultipleContainersWaitingForReservation() throws IOException {
scheduler.init(conf);
@@ -5062,91 +3943,6 @@ public class TestFairScheduler extends FairSchedulerTestBase {
}
}
}
-
- @Test(timeout = 5000)
- public void testRecoverRequestAfterPreemption() throws Exception {
- conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
-
- ControlledClock clock = new ControlledClock();
- scheduler.setClock(clock);
- scheduler.init(conf);
- scheduler.start();
- scheduler.reinitialize(conf, resourceManager.getRMContext());
-
- Priority priority = Priority.newInstance(20);
- String host = "127.0.0.1";
- int GB = 1024;
-
- // Create Node and raised Node Added event
- RMNode node = MockNodes.newNodeInfo(1,
- Resources.createResource(16 * 1024, 4), 0, host);
- NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
- scheduler.handle(nodeEvent);
-
- // Create 3 container requests and place it in ask
- List ask = new ArrayList();
- ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
- priority.getPriority(), 1, true);
- ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
- node.getRackName(), priority.getPriority(), 1, true);
- ResourceRequest offRackRequest = createResourceRequest(GB, 1,
- ResourceRequest.ANY, priority.getPriority(), 1, true);
- ask.add(nodeLocalRequest);
- ask.add(rackLocalRequest);
- ask.add(offRackRequest);
-
- // Create Request and update
- ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
- "user1", ask);
- scheduler.update();
-
- // Sufficient node check-ins to fully schedule containers
- NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
- scheduler.handle(nodeUpdate);
-
- assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
- .size());
- SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId);
-
- // ResourceRequest will be empty once NodeUpdate is completed
- Assert.assertNull(app.getResourceRequest(priority, host));
-
- ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
- RMContainer rmContainer = app.getRMContainer(containerId1);
-
- // Create a preempt event and register for preemption
- scheduler.warnOrKillContainer(rmContainer);
-
- // Wait for few clock ticks
- clock.tickSec(5);
-
- // preempt now
- scheduler.warnOrKillContainer(rmContainer);
-
- // Trigger container rescheduled event
- scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
- SchedulerEventType.KILL_PREEMPTED_CONTAINER));
-
- List requests = rmContainer.getResourceRequests();
- // Once recovered, resource request will be present again in app
- Assert.assertEquals(3, requests.size());
- for (ResourceRequest request : requests) {
- Assert.assertEquals(1,
- app.getResourceRequest(priority, request.getResourceName())
- .getNumContainers());
- }
-
- // Send node heartbeat
- scheduler.update();
- scheduler.handle(nodeUpdate);
-
- List containers = scheduler.allocate(appAttemptId,
- Collections. emptyList(),
- Collections. emptyList(), null, null, null, null).getContainers();
-
- // Now with updated ResourceRequest, a container is allocated for AM.
- Assert.assertTrue(containers.size() == 1);
- }
@Test
public void testBlacklistNodes() throws Exception {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
index 6f759ce3363..5bdcc08c31d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java
@@ -18,18 +18,31 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.util.ControlledClock;
import org.apache.hadoop.yarn.util.resource.Resources;
+
import org.junit.After;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -37,6 +50,11 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -84,7 +102,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
conf = null;
}
- private void startResourceManager(float utilizationThreshold) {
+ private void startResourceManagerWithStubbedFairScheduler(float utilizationThreshold) {
conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD,
utilizationThreshold);
resourceManager = new MockRM(conf);
@@ -98,6 +116,51 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
scheduler.updateInterval = 60 * 1000;
}
+ // YARN-4648: The starting code for ResourceManager mock is originated from
+ // TestFairScheduler. It should be keep as it was to guarantee no changing
+ // behaviour of ResourceManager preemption.
+ private void startResourceManagerWithRealFairScheduler() {
+ scheduler = new FairScheduler();
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
+ ResourceScheduler.class);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+ conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
+ 1024);
+ conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
+ conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
+ conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+ conf.setFloat(
+ FairSchedulerConfiguration
+ .RM_SCHEDULER_RESERVATION_THRESHOLD_INCERMENT_MULTIPLE,
+ TEST_RESERVATION_THRESHOLD);
+
+ resourceManager = new MockRM(conf);
+
+ // TODO: This test should really be using MockRM. For now starting stuff
+ // that is needed at a bare minimum.
+ ((AsyncDispatcher)resourceManager.getRMContext().getDispatcher()).start();
+ resourceManager.getRMContext().getStateStore().start();
+
+ // to initialize the master key
+ resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
+
+ scheduler.setRMContext(resourceManager.getRMContext());
+ }
+
+ private void stopResourceManager() {
+ if (scheduler != null) {
+ scheduler.stop();
+ scheduler = null;
+ }
+ if (resourceManager != null) {
+ resourceManager.stop();
+ resourceManager = null;
+ }
+ QueueMetrics.clearQueueMetrics();
+ DefaultMetricsSystem.shutdown();
+ }
+
private void registerNodeAndSubmitApp(
int memory, int vcores, int appContainers, int appMemory) {
RMNode node1 = MockNodes.newNodeInfo(
@@ -143,7 +206,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
out.println("");
out.close();
- startResourceManager(0f);
+ startResourceManagerWithStubbedFairScheduler(0f);
// Create node with 4GB memory and 4 vcores
registerNodeAndSubmitApp(4 * 1024, 4, 2, 1024);
@@ -159,7 +222,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
resourceManager.stop();
- startResourceManager(0.8f);
+ startResourceManagerWithStubbedFairScheduler(0.8f);
// Create node with 4GB memory and 4 vcores
registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
@@ -175,7 +238,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
resourceManager.stop();
- startResourceManager(0.7f);
+ startResourceManagerWithStubbedFairScheduler(0.7f);
// Create node with 4GB memory and 4 vcores
registerNodeAndSubmitApp(4 * 1024, 4, 3, 1024);
@@ -189,4 +252,1226 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
assertEquals("preemptResources() should have been called", 1024,
((StubbedFairScheduler) scheduler).lastPreemptMemory);
}
+
+ @Test (timeout = 5000)
+ /**
+ * Make sure containers are chosen to be preempted in the correct order.
+ */
+ public void testChoiceOfPreemptedContainers() throws Exception {
+ startResourceManagerWithRealFairScheduler();
+ conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
+ conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Create two nodes
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ // Queue A and B each request two applications
+ ApplicationAttemptId app1 =
+ createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
+ ApplicationAttemptId app2 =
+ createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
+
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
+ ApplicationAttemptId app4 =
+ createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
+ createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
+
+ scheduler.update();
+
+ scheduler.getQueueManager().getLeafQueue("queueA", true)
+ .setPolicy(SchedulingPolicy.parse("fifo"));
+ scheduler.getQueueManager().getLeafQueue("queueB", true)
+ .setPolicy(SchedulingPolicy.parse("fair"));
+
+ // Sufficient node check-ins to fully schedule containers
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ for (int i = 0; i < 4; i++) {
+ scheduler.handle(nodeUpdate1);
+ scheduler.handle(nodeUpdate2);
+ }
+
+ assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+
+ // Now new requests arrive from queueC and default
+ createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
+ scheduler.update();
+
+ // We should be able to claw back one container from queueA and queueB each.
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+ // First verify we are adding containers to preemption list for the app.
+ // For queueA (fifo), app2 is selected.
+ // For queueB (fair), app4 is selected.
+ assertTrue("App2 should have container to be preempted",
+ !Collections.disjoint(
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+ assertTrue("App4 should have container to be preempted",
+ !Collections.disjoint(
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+
+ // Pretend 15 seconds have passed
+ clock.tickSec(15);
+
+ // Trigger a kill by insisting we want containers back
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+ // At this point the containers should have been killed (since we are not simulating AM)
+ assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+ // Inside each app, containers are sorted according to their priorities.
+ // Containers with priority 4 are preempted for app2 and app4.
+ Set set = new HashSet();
+ for (RMContainer container :
+ scheduler.getSchedulerApp(app2).getLiveContainers()) {
+ if (container.getAllocatedPriority().getPriority() == 4) {
+ set.add(container);
+ }
+ }
+ for (RMContainer container :
+ scheduler.getSchedulerApp(app4).getLiveContainers()) {
+ if (container.getAllocatedPriority().getPriority() == 4) {
+ set.add(container);
+ }
+ }
+ assertTrue("Containers with priority=4 in app2 and app4 should be " +
+ "preempted.", set.isEmpty());
+
+ // Trigger a kill by insisting we want containers back
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+ // Pretend 15 seconds have passed
+ clock.tickSec(15);
+
+ // We should be able to claw back another container from A and B each.
+ // For queueA (fifo), continue preempting from app2.
+ // For queueB (fair), even app4 has a lowest priority container with p=4, it
+ // still preempts from app3 as app3 is most over fair share.
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+
+ assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+
+ // Now A and B are below fair share, so preemption shouldn't do anything
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ assertTrue("App1 should have no container to be preempted",
+ scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
+ assertTrue("App2 should have no container to be preempted",
+ scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
+ assertTrue("App3 should have no container to be preempted",
+ scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
+ assertTrue("App4 should have no container to be preempted",
+ scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
+ stopResourceManager();
+ }
+
+ @Test
+ public void testPreemptionIsNotDelayedToNextRound() throws Exception {
+ startResourceManagerWithRealFairScheduler();
+
+ conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
+
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("8");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("2");
+ out.println("");
+ out.println("10");
+ out.println(".5");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Add a node of 8G
+ RMNode node1 = MockNodes.newNodeInfo(1,
+ Resources.createResource(8 * 1024, 8), 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Run apps in queueA.A1 and queueB
+ ApplicationAttemptId app1 = createSchedulingRequest(1 * 1024, 1,
+ "queueA.queueA1", "user1", 7, 1);
+ // createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
+ ApplicationAttemptId app2 = createSchedulingRequest(1 * 1024, 1, "queueB",
+ "user2", 1, 1);
+
+ scheduler.update();
+
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ for (int i = 0; i < 8; i++) {
+ scheduler.handle(nodeUpdate1);
+ }
+
+ // verify if the apps got the containers they requested
+ assertEquals(7, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+
+ // Now submit an app in queueA.queueA2
+ ApplicationAttemptId app3 = createSchedulingRequest(1 * 1024, 1,
+ "queueA.queueA2", "user3", 7, 1);
+ scheduler.update();
+
+ // Let 11 sec pass
+ clock.tickSec(11);
+
+ scheduler.update();
+ Resource toPreempt = scheduler.resourceDeficit(scheduler.getQueueManager()
+ .getLeafQueue("queueA.queueA2", false), clock.getTime());
+ assertEquals(3277, toPreempt.getMemory());
+
+ // verify if the 3 containers required by queueA2 are preempted in the same
+ // round
+ scheduler.preemptResources(toPreempt);
+ assertEquals(3, scheduler.getSchedulerApp(app1).getPreemptionContainers()
+ .size());
+ stopResourceManager();
+ }
+
+ @Test (timeout = 5000)
+ /**
+ * Tests the timing of decision to preempt tasks.
+ */
+ public void testPreemptionDecision() throws Exception {
+ startResourceManagerWithRealFairScheduler();
+
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("0mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,0vcores");
+ out.println("");
+ out.println("5");
+ out.println("10");
+ out.println(".5");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Create four nodes
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ RMNode node3 =
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
+ "127.0.0.3");
+ NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+ scheduler.handle(nodeEvent3);
+
+ // Queue A and B each request three containers
+ ApplicationAttemptId app1 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+ ApplicationAttemptId app2 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+
+ ApplicationAttemptId app4 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
+ ApplicationAttemptId app5 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
+ ApplicationAttemptId app6 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ for (int i = 0; i < 2; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+ scheduler.handle(nodeUpdate3);
+ }
+
+ // Now new requests arrive from queues C and D
+ ApplicationAttemptId app7 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+ ApplicationAttemptId app8 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+ ApplicationAttemptId app9 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+ ApplicationAttemptId app10 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
+ ApplicationAttemptId app11 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
+ ApplicationAttemptId app12 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
+
+ scheduler.update();
+
+ FSLeafQueue schedC =
+ scheduler.getQueueManager().getLeafQueue("queueC", true);
+ FSLeafQueue schedD =
+ scheduler.getQueueManager().getLeafQueue("queueD", true);
+
+ assertTrue(Resources.equals(
+ Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
+ assertTrue(Resources.equals(
+ Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
+ // After minSharePreemptionTime has passed, they should want to preempt min
+ // share.
+ clock.tickSec(6);
+ assertEquals(
+ 1024, scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
+ assertEquals(
+ 1024, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
+
+ // After fairSharePreemptionTime has passed, they should want to preempt
+ // fair share.
+ scheduler.update();
+ clock.tickSec(6);
+ assertEquals(
+ 1536 , scheduler.resourceDeficit(schedC, clock.getTime()).getMemory());
+ assertEquals(
+ 1536, scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
+ stopResourceManager();
+ }
+
+ @Test
+/**
+ * Tests the timing of decision to preempt tasks.
+ */
+ public void testPreemptionDecisionWithDRF() throws Exception {
+ startResourceManagerWithRealFairScheduler();
+
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("0mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,1vcores");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,2vcores");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,3vcores");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,2vcores");
+ out.println("");
+ out.println("5");
+ out.println("10");
+ out.println(".5");
+ out.println("drf");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Create four nodes
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ RMNode node3 =
+ MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 4), 3,
+ "127.0.0.3");
+ NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+ scheduler.handle(nodeEvent3);
+
+ // Queue A and B each request three containers
+ ApplicationAttemptId app1 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+ ApplicationAttemptId app2 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
+
+ ApplicationAttemptId app4 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
+ ApplicationAttemptId app5 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
+ ApplicationAttemptId app6 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
+
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ for (int i = 0; i < 2; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+ scheduler.handle(nodeUpdate3);
+ }
+
+ // Now new requests arrive from queues C and D
+ ApplicationAttemptId app7 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+ ApplicationAttemptId app8 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+ ApplicationAttemptId app9 =
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+ ApplicationAttemptId app10 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 1);
+ ApplicationAttemptId app11 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 2);
+ ApplicationAttemptId app12 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 2, 3);
+
+ scheduler.update();
+
+ FSLeafQueue schedC =
+ scheduler.getQueueManager().getLeafQueue("queueC", true);
+ FSLeafQueue schedD =
+ scheduler.getQueueManager().getLeafQueue("queueD", true);
+
+ assertTrue(Resources.equals(
+ Resources.none(), scheduler.resourceDeficit(schedC, clock.getTime())));
+ assertTrue(Resources.equals(
+ Resources.none(), scheduler.resourceDeficit(schedD, clock.getTime())));
+
+ // Test :
+ // 1) whether componentWise min works as expected.
+ // 2) DRF calculator is used
+
+ // After minSharePreemptionTime has passed, they should want to preempt min
+ // share.
+ clock.tickSec(6);
+ Resource res = scheduler.resourceDeficit(schedC, clock.getTime());
+ assertEquals(1024, res.getMemory());
+ // Demand = 3
+ assertEquals(3, res.getVirtualCores());
+
+ res = scheduler.resourceDeficit(schedD, clock.getTime());
+ assertEquals(1024, res.getMemory());
+ // Demand = 6, but min share = 2
+ assertEquals(2, res.getVirtualCores());
+
+ // After fairSharePreemptionTime has passed, they should want to preempt
+ // fair share.
+ scheduler.update();
+ clock.tickSec(6);
+ res = scheduler.resourceDeficit(schedC, clock.getTime());
+ assertEquals(1536, res.getMemory());
+ assertEquals(3, res.getVirtualCores());
+
+ res = scheduler.resourceDeficit(schedD, clock.getTime());
+ assertEquals(1536, res.getMemory());
+ // Demand = 6, but fair share = 3
+ assertEquals(3, res.getVirtualCores());
+ stopResourceManager();
+ }
+
+ @Test
+ /**
+ * Tests the various timing of decision to preempt tasks.
+ */
+ public void testPreemptionDecisionWithVariousTimeout() throws Exception {
+ startResourceManagerWithRealFairScheduler();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("0mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println("1");
+ out.println("1024mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println("2");
+ out.println("10");
+ out.println("25");
+ out.println("");
+ out.println("1024mb,0vcores");
+ out.println("5");
+ out.println("");
+ out.println("");
+ out.println("1024mb,0vcores");
+ out.println("20");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("1");
+ out.println("1024mb,0vcores");
+ out.println("");
+ out.print("15");
+ out.print("30");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Check the min/fair share preemption timeout for each queue
+ QueueManager queueMgr = scheduler.getQueueManager();
+ assertEquals(30000, queueMgr.getQueue("root")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("default")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueA")
+ .getFairSharePreemptionTimeout());
+ assertEquals(25000, queueMgr.getQueue("queueB")
+ .getFairSharePreemptionTimeout());
+ assertEquals(25000, queueMgr.getQueue("queueB.queueB1")
+ .getFairSharePreemptionTimeout());
+ assertEquals(20000, queueMgr.getQueue("queueB.queueB2")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueC")
+ .getFairSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("root")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("default")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("queueA")
+ .getMinSharePreemptionTimeout());
+ assertEquals(10000, queueMgr.getQueue("queueB")
+ .getMinSharePreemptionTimeout());
+ assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
+ .getMinSharePreemptionTimeout());
+ assertEquals(10000, queueMgr.getQueue("queueB.queueB2")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("queueC")
+ .getMinSharePreemptionTimeout());
+
+ // Create one big node
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(6 * 1024, 6), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // Queue A takes all resources
+ for (int i = 0; i < 6; i ++) {
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
+ }
+
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ for (int i = 0; i < 6; i++) {
+ scheduler.handle(nodeUpdate1);
+ }
+
+ // Now new requests arrive from queues B1, B2 and C
+ createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 2);
+ createSchedulingRequest(1 * 1024, "queueB.queueB1", "user1", 1, 3);
+ createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 2);
+ createSchedulingRequest(1 * 1024, "queueB.queueB2", "user1", 1, 3);
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
+ createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
+
+ scheduler.update();
+
+ FSLeafQueue queueB1 = queueMgr.getLeafQueue("queueB.queueB1", true);
+ FSLeafQueue queueB2 = queueMgr.getLeafQueue("queueB.queueB2", true);
+ FSLeafQueue queueC = queueMgr.getLeafQueue("queueC", true);
+
+ assertTrue(Resources.equals(
+ Resources.none(), scheduler.resourceDeficit(queueB1, clock.getTime())));
+ assertTrue(Resources.equals(
+ Resources.none(), scheduler.resourceDeficit(queueB2, clock.getTime())));
+ assertTrue(Resources.equals(
+ Resources.none(), scheduler.resourceDeficit(queueC, clock.getTime())));
+
+ // After 5 seconds, queueB1 wants to preempt min share
+ scheduler.update();
+ clock.tickSec(6);
+ assertEquals(
+ 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+ assertEquals(
+ 0, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+ assertEquals(
+ 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+
+ // After 10 seconds, queueB2 wants to preempt min share
+ scheduler.update();
+ clock.tickSec(5);
+ assertEquals(
+ 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+ assertEquals(
+ 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+ assertEquals(
+ 0, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+
+ // After 15 seconds, queueC wants to preempt min share
+ scheduler.update();
+ clock.tickSec(5);
+ assertEquals(
+ 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+ assertEquals(
+ 1024, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+ assertEquals(
+ 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+
+ // After 20 seconds, queueB2 should want to preempt fair share
+ scheduler.update();
+ clock.tickSec(5);
+ assertEquals(
+ 1024, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+ assertEquals(
+ 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+ assertEquals(
+ 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+
+ // After 25 seconds, queueB1 should want to preempt fair share
+ scheduler.update();
+ clock.tickSec(5);
+ assertEquals(
+ 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+ assertEquals(
+ 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+ assertEquals(
+ 1024, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+
+ // After 30 seconds, queueC should want to preempt fair share
+ scheduler.update();
+ clock.tickSec(5);
+ assertEquals(
+ 1536, scheduler.resourceDeficit(queueB1, clock.getTime()).getMemory());
+ assertEquals(
+ 1536, scheduler.resourceDeficit(queueB2, clock.getTime()).getMemory());
+ assertEquals(
+ 1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
+ stopResourceManager();
+ }
+
+ @Test
+ /**
+ * Tests the decision to preempt tasks respect to non-preemptable queues
+ * 1, Queues as follow:
+ * queueA(non-preemptable)
+ * queueB(preemptable)
+ * parentQueue(non-preemptable)
+ * --queueC(preemptable)
+ * queueD(preemptable)
+ * 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare
+ * 3, Now all resource are occupied
+ * 4, Submit request to queueD, and need to preempt resource from other queues
+ * 5, Only preemptable queue(queueB) would be preempted.
+ */
+ public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception {
+ startResourceManagerWithRealFairScheduler();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("0mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,0vcores");
+ out.println("false");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println("false");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("2048mb,0vcores");
+ out.println("");
+ out.println("5");
+ out.println("10");
+ out.println(".5");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Create four nodes(3G each)
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ RMNode node3 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
+ "127.0.0.3");
+ NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+ scheduler.handle(nodeEvent3);
+
+ RMNode node4 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
+ "127.0.0.4");
+ NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
+ scheduler.handle(nodeEvent4);
+
+ // Submit apps to queueA, queueB, queueC,
+ // now all resource of the cluster is occupied
+ ApplicationAttemptId app1 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
+ ApplicationAttemptId app2 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2);
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3);
+
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ for (int i = 0; i < 3; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+ scheduler.handle(nodeUpdate3);
+
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+ scheduler.handle(nodeUpdate4);
+ }
+
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+ // Now new requests arrive from queues D
+ ApplicationAttemptId app4 =
+ createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1);
+ scheduler.update();
+ FSLeafQueue schedD =
+ scheduler.getQueueManager().getLeafQueue("queueD", true);
+
+ // After minSharePreemptionTime has passed, 2G resource should preempted from
+ // queueB to queueD
+ clock.tickSec(6);
+ assertEquals(2048,
+ scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
+
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ // now only app2 is selected to be preempted
+ assertTrue("App2 should have container to be preempted",
+ !Collections.disjoint(
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+ assertTrue("App1 should not have container to be preempted",
+ Collections.disjoint(
+ scheduler.getSchedulerApp(app1).getLiveContainers(),
+ scheduler.getSchedulerApp(app1).getPreemptionContainers()));
+ assertTrue("App3 should not have container to be preempted",
+ Collections.disjoint(
+ scheduler.getSchedulerApp(app3).getLiveContainers(),
+ scheduler.getSchedulerApp(app3).getPreemptionContainers()));
+ // Pretend 20 seconds have passed
+ clock.tickSec(20);
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ for (int i = 0; i < 3; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+ scheduler.handle(nodeUpdate3);
+
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+ scheduler.handle(nodeUpdate4);
+ }
+ // after preemption
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+ stopResourceManager();
+ }
+
+ @Test
+ /**
+ * Tests the decision to preempt tasks when allowPreemptionFrom is set false on
+ * all queues.
+ * Then none of them would be preempted actually.
+ * 1, Queues as follow:
+ * queueA(non-preemptable)
+ * queueB(non-preemptable)
+ * parentQueue(non-preemptable)
+ * --queueC(preemptable)
+ * parentQueue(preemptable)
+ * --queueD(non-preemptable)
+ * 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare
+ * 3, Now all resource are occupied
+ * 4, Submit request to queueA, and need to preempt resource from other queues
+ * 5, None of queues would be preempted.
+ */
+ public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
+ throws Exception {
+ startResourceManagerWithRealFairScheduler();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("0mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("2048mb,0vcores");
+ out.println("false");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,0vcores");
+ out.println("false");
+ out.println("");
+ out.println("");
+ out.println("false");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,0vcores");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println(".25");
+ out.println("1024mb,0vcores");
+ out.println("false");
+ out.println("");
+ out.println("");
+ out.println("5");
+ out.println("10");
+ out.println(".5");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Create four nodes(3G each)
+ RMNode node1 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
+ "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ RMNode node2 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
+ "127.0.0.2");
+ NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+ scheduler.handle(nodeEvent2);
+
+ RMNode node3 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
+ "127.0.0.3");
+ NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+ scheduler.handle(nodeEvent3);
+
+ RMNode node4 =
+ MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
+ "127.0.0.4");
+ NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
+ scheduler.handle(nodeEvent4);
+
+ // Submit apps to queueB, queueC, queueD
+ // now all resource of the cluster is occupied
+
+ ApplicationAttemptId app1 =
+ createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1);
+ ApplicationAttemptId app2 =
+ createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2);
+ ApplicationAttemptId app3 =
+ createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3);
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ for (int i = 0; i < 3; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+ scheduler.handle(nodeUpdate3);
+
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+ scheduler.handle(nodeUpdate4);
+ }
+
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+
+ // Now new requests arrive from queues A
+ ApplicationAttemptId app4 =
+ createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
+ scheduler.update();
+ FSLeafQueue schedA =
+ scheduler.getQueueManager().getLeafQueue("queueA", true);
+
+ // After minSharePreemptionTime has passed, resource deficit is 2G
+ clock.tickSec(6);
+ assertEquals(2048,
+ scheduler.resourceDeficit(schedA, clock.getTime()).getMemory());
+
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ // now none app is selected to be preempted
+ assertTrue("App1 should have container to be preempted",
+ Collections.disjoint(
+ scheduler.getSchedulerApp(app1).getLiveContainers(),
+ scheduler.getSchedulerApp(app1).getPreemptionContainers()));
+ assertTrue("App2 should not have container to be preempted",
+ Collections.disjoint(
+ scheduler.getSchedulerApp(app2).getLiveContainers(),
+ scheduler.getSchedulerApp(app2).getPreemptionContainers()));
+ assertTrue("App3 should not have container to be preempted",
+ Collections.disjoint(
+ scheduler.getSchedulerApp(app3).getLiveContainers(),
+ scheduler.getSchedulerApp(app3).getPreemptionContainers()));
+ // Pretend 20 seconds have passed
+ clock.tickSec(20);
+ scheduler.preemptResources(Resources.createResource(2 * 1024));
+ for (int i = 0; i < 3; i++) {
+ NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
+ scheduler.handle(nodeUpdate1);
+
+ NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
+ scheduler.handle(nodeUpdate2);
+
+ NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
+ scheduler.handle(nodeUpdate3);
+
+ NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
+ scheduler.handle(nodeUpdate4);
+ }
+ // after preemption
+ assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
+ assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
+ assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size());
+ stopResourceManager();
+ }
+
+ @Test
+ public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
+ startResourceManagerWithRealFairScheduler();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("5");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.print("15");
+ out.print("30");
+ out.print("40");
+ out.println("");
+ out.close();
+
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ // Check the min/fair share preemption timeout for each queue
+ QueueManager queueMgr = scheduler.getQueueManager();
+ assertEquals(30000, queueMgr.getQueue("root")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("default")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueA")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueB")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueB.queueB1")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueB.queueB2")
+ .getFairSharePreemptionTimeout());
+ assertEquals(30000, queueMgr.getQueue("queueC")
+ .getFairSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("root")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("default")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("queueA")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("queueB")
+ .getMinSharePreemptionTimeout());
+ assertEquals(5000, queueMgr.getQueue("queueB.queueB1")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("queueB.queueB2")
+ .getMinSharePreemptionTimeout());
+ assertEquals(15000, queueMgr.getQueue("queueC")
+ .getMinSharePreemptionTimeout());
+
+ // If both exist, we take the default one
+ out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("5");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.println("");
+ out.print("15");
+ out.print("25");
+ out.print("30");
+ out.println("");
+ out.close();
+
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ assertEquals(25000, queueMgr.getQueue("root")
+ .getFairSharePreemptionTimeout());
+ stopResourceManager();
+ }
+
+ @Test(timeout = 5000)
+ public void testRecoverRequestAfterPreemption() throws Exception {
+ startResourceManagerWithRealFairScheduler();
+ conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10);
+
+ ControlledClock clock = new ControlledClock();
+ scheduler.setClock(clock);
+ scheduler.init(conf);
+ scheduler.start();
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ Priority priority = Priority.newInstance(20);
+ String host = "127.0.0.1";
+ int GB = 1024;
+
+ // Create Node and raised Node Added event
+ RMNode node = MockNodes.newNodeInfo(1,
+ Resources.createResource(16 * 1024, 4), 0, host);
+ NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+ scheduler.handle(nodeEvent);
+
+ // Create 3 container requests and place it in ask
+ List ask = new ArrayList();
+ ResourceRequest nodeLocalRequest = createResourceRequest(GB, 1, host,
+ priority.getPriority(), 1, true);
+ ResourceRequest rackLocalRequest = createResourceRequest(GB, 1,
+ node.getRackName(), priority.getPriority(), 1, true);
+ ResourceRequest offRackRequest = createResourceRequest(GB, 1,
+ ResourceRequest.ANY, priority.getPriority(), 1, true);
+ ask.add(nodeLocalRequest);
+ ask.add(rackLocalRequest);
+ ask.add(offRackRequest);
+
+ // Create Request and update
+ ApplicationAttemptId appAttemptId = createSchedulingRequest("queueA",
+ "user1", ask);
+ scheduler.update();
+
+ // Sufficient node check-ins to fully schedule containers
+ NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
+ scheduler.handle(nodeUpdate);
+
+ assertEquals(1, scheduler.getSchedulerApp(appAttemptId).getLiveContainers()
+ .size());
+ SchedulerApplicationAttempt app = scheduler.getSchedulerApp(appAttemptId);
+
+ // ResourceRequest will be empty once NodeUpdate is completed
+ Assert.assertNull(app.getResourceRequest(priority, host));
+
+ ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+ RMContainer rmContainer = app.getRMContainer(containerId1);
+
+ // Create a preempt event and register for preemption
+ scheduler.warnOrKillContainer(rmContainer);
+
+ // Wait for few clock ticks
+ clock.tickSec(5);
+
+ // preempt now
+ scheduler.warnOrKillContainer(rmContainer);
+
+ // Trigger container rescheduled event
+ scheduler.handle(new ContainerPreemptEvent(appAttemptId, rmContainer,
+ SchedulerEventType.KILL_PREEMPTED_CONTAINER));
+
+ List requests = rmContainer.getResourceRequests();
+ // Once recovered, resource request will be present again in app
+ Assert.assertEquals(3, requests.size());
+ for (ResourceRequest request : requests) {
+ Assert.assertEquals(1,
+ app.getResourceRequest(priority, request.getResourceName())
+ .getNumContainers());
+ }
+
+ // Send node heartbeat
+ scheduler.update();
+ scheduler.handle(nodeUpdate);
+
+ List containers = scheduler.allocate(appAttemptId,
+ Collections. emptyList(),
+ Collections. emptyList(), null, null, null, null).getContainers();
+
+ // Now with updated ResourceRequest, a container is allocated for AM.
+ Assert.assertTrue(containers.size() == 1);
+ stopResourceManager();
+ }
}