From c4c77669f0c275eb326dbde5ecf433402f3cf100 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Fri, 14 Nov 2014 15:18:56 -0800 Subject: [PATCH] YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max share (Siqi Li via Sandy Ryza) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FairScheduler.java | 18 +++- .../scheduler/fair/TestFairScheduler.java | 88 ++++++++++++++++++- 3 files changed, 103 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0f3d31e60dc..0a60f5c3479 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -198,6 +198,9 @@ Release 2.6.0 - 2014-11-18 YARN-2505. Supported get/add/remove/change labels in RM REST API. (Craig Welch via zjshen) + YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max + share (Siqi Li via Sandy Ryza) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 3fc3019a8b5..94fb8491759 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1029,7 +1029,10 @@ public class FairScheduler extends FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable(); if (reservedAppSchedulable != null) { Priority reservedPriority = node.getReservedContainer().getReservedPriority(); - if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) { + FSQueue queue = reservedAppSchedulable.getQueue(); + + if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node) + || !fitInMaxShare(queue)) { // Don't hold the reservation if app can no longer use it LOG.info("Releasing reservation that cannot be satisfied for application " + reservedAppSchedulable.getApplicationAttemptId() @@ -1043,7 +1046,6 @@ public class FairScheduler extends + reservedAppSchedulable.getApplicationAttemptId() + " on node: " + node); } - node.getReservedAppSchedulable().assignReservedContainer(node); } } @@ -1065,6 +1067,18 @@ public class FairScheduler extends updateRootQueueMetrics(); } + private boolean fitInMaxShare(FSQueue queue) { + if (Resources.fitsIn(queue.getResourceUsage(), queue.getMaxShare())) { + return false; + } + + FSQueue parentQueue = queue.getParent(); + if (parentQueue != null) { + return fitInMaxShare(parentQueue); + } + return true; + } + public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) { return super.getApplicationAttempt(appAttemptId); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index ca0e954e729..19963676a89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -722,6 +722,85 @@ public class TestFairScheduler extends FairSchedulerTestBase { } + @Test (timeout = 5000) + public void testContainerReservationNotExceedingQueueMax() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println(""); + out.println(""); + out.println("1024mb,5vcores"); + out.println("2048mb,10vcores"); + out.println(""); + out.println(""); + out.println("1024mb,5vcores"); + out.println("2048mb,10vcores"); + out.println(""); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(3072, 5), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Queue 1 requests full capacity of the queue + createSchedulingRequest(2048, "queue1", "user1", 1); + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // Make sure queue 1 is allocated app capacity + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + + // Now queue 2 requests likewise + ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user2", 1); + scheduler.update(); + scheduler.handle(updateEvent); + + // Make sure queue 2 is allocated app capacity + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1); + scheduler.update(); + scheduler.handle(updateEvent); + + // Make sure queue 1 is waiting with a reservation + assertEquals(1024, scheduler.getSchedulerApp(attId1) + .getCurrentReservation().getMemory()); + + // Now remove app of queue2 + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId, RMAppAttemptState.FINISHED, false); + scheduler.update(); + scheduler.handle(appRemovedEvent1); + + // Queue should have no apps + assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + + createSchedulingRequest(1024, "queue2", "user2", 1); + scheduler.handle(updateEvent); + // Make sure allocated memory of queue1 doesn't exceed its maximum + assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). + getResourceUsage().getMemory()); + //the reservation of queue1 should be reclaim + assertEquals(0, scheduler.getSchedulerApp(attId1). + getCurrentReservation().getMemory()); + assertEquals(1024, scheduler.getQueueManager().getQueue("queue2"). + getResourceUsage().getMemory()); + } + @Test public void testUserAsDefaultQueue() throws Exception { conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); @@ -2076,9 +2155,10 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.handle(updateEvent); assertEquals(1, app.getLiveContainers().size()); - // Reserved container should still be at lower priority + // Reserved container should will be at higher priority, + // since old reservation cannot be satisfied for (RMContainer container : app.getReservedContainers()) { - assertEquals(2, container.getReservedPriority().getPriority()); + assertEquals(1, container.getReservedPriority().getPriority()); } // Complete container @@ -2091,11 +2171,11 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.update(); scheduler.handle(updateEvent); - // Reserved container (at lower priority) should be run + // Reserved container (at higher priority) should be run Collection liveContainers = app.getLiveContainers(); assertEquals(1, liveContainers.size()); for (RMContainer liveContainer : liveContainers) { - Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority()); + Assert.assertEquals(1, liveContainer.getContainer().getPriority().getPriority()); } assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());