From 721d7b574126c4070322f70ec5b49a7b8558a4c7 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 4 Mar 2015 18:06:36 -0800 Subject: [PATCH] YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha) (cherry picked from commit 22426a1c9f4bd616558089b6862fd34ab42d19a7) --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FairScheduler.java | 1 + .../fair/MaxRunningAppsEnforcer.java | 40 ++- .../scheduler/fair/TestFairScheduler.java | 310 +++++++++++++++++- 4 files changed, 348 insertions(+), 6 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 92c44e908b3..e648e3011b0 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -655,6 +655,9 @@ Release 2.7.0 - UNRELEASED YARN-3131. YarnClientImpl should check FAILED and KILLED state in submitApplication (Chang Li via jlowe) + + YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending + jobs. (Siqi Li via kasha) Release 2.6.0 - 2014-11-18 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2b597164baf..e8a955534e3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1477,6 +1477,7 @@ public class FairScheduler extends allocConf = queueInfo; allocConf.getDefaultSchedulingPolicy().initialize(clusterResource); queueMgr.updateAllocationConfiguration(allocConf); + maxRunningEnforcer.updateRunnabilityOnReload(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java index 2c90edd400a..f75043814d0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java @@ -104,6 +104,26 @@ public class MaxRunningAppsEnforcer { usersNonRunnableApps.put(user, app); } + /** + * This is called after reloading the allocation configuration when the + * scheduler is reinitilized + * + * Checks to see whether any non-runnable applications become runnable + * now that the max running apps of given queue has been changed + * + * Runs in O(n) where n is the number of apps that are non-runnable and in + * the queues that went from having no slack to having slack. + */ + public void updateRunnabilityOnReload() { + FSParentQueue rootQueue = scheduler.getQueueManager().getRootQueue(); + List> appsNowMaybeRunnable = + new ArrayList>(); + + gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable); + + updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE); + } + /** * Checks to see whether any other applications runnable now that the given * application has been removed from the given queue. And makes them so. @@ -156,6 +176,19 @@ public class MaxRunningAppsEnforcer { } } + updateAppsRunnability(appsNowMaybeRunnable, + appsNowMaybeRunnable.size()); + } + + /** + * Checks to see whether applications are runnable now by iterating + * through each one of them and check if the queue and user have slack + * + * if we know how many apps can be runnable, there is no need to iterate + * through all apps, maxRunnableApps is used to break out of the iteration + */ + private void updateAppsRunnability(List> + appsNowMaybeRunnable, int maxRunnableApps) { // Scan through and check whether this means that any apps are now runnable Iterator iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); @@ -173,9 +206,7 @@ public class MaxRunningAppsEnforcer { next.getQueue().addApp(appSched, true); noLongerPendingApps.add(appSched); - // No more than one app per list will be able to be made runnable, so - // we can stop looking after we've found that many - if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) { + if (noLongerPendingApps.size() >= maxRunnableApps) { break; } } @@ -194,11 +225,10 @@ public class MaxRunningAppsEnforcer { if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) { LOG.error("Waiting app " + appSched + " expected to be in " - + "usersNonRunnableApps, but was not. This should never happen."); + + "usersNonRunnableApps, but was not. This should never happen."); } } } - /** * Updates the relevant tracking variables after a runnable app with the given * queue and user has been removed. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index f67357abbda..854ba778414 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2288,7 +2288,315 @@ public class TestFairScheduler extends FairSchedulerTestBase { // Request should be fulfilled assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); } - + + @Test (timeout = 5000) + public void testIncreaseQueueMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = "" + + "" + + "" + + "" + + "1" + + "" + + "" + + ""; + + String allocAfter = "" + + "" + + "" + + "" + + "3" + + "" + + "" + + ""; + + testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + @Test (timeout = 5000) + public void testIncreaseUserMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = ""+ + ""+ + ""+ + ""+ + "10"+ + ""+ + ""+ + ""+ + "1"+ + ""+ + ""; + + String allocAfter = ""+ + ""+ + ""+ + ""+ + "10"+ + ""+ + ""+ + ""+ + "3"+ + ""+ + ""; + + testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + private void testIncreaseQueueSettingOnTheFlyInternal(String allocBefore, + String allocAfter) throws Exception { + // Set max running apps + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocBefore); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Request for app 1 + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // App 1 should be running + assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + // App 3 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocAfter); + out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 3 should be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 1 + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should be running + assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + } + + @Test (timeout = 5000) + public void testDecreaseQueueMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = "" + + "" + + "" + + "" + + "3" + + "" + + "" + + ""; + + String allocAfter = "" + + "" + + "" + + "" + + "1" + + "" + + "" + + ""; + + testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + @Test (timeout = 5000) + public void testDecreaseUserMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = ""+ + ""+ + ""+ + ""+ + "10"+ + ""+ + ""+ + ""+ + "3"+ + ""+ + ""; + + String allocAfter = ""+ + ""+ + ""+ + ""+ + "10"+ + ""+ + ""+ + ""+ + "1"+ + ""+ + ""; + + testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + private void testDecreaseQueueSettingOnTheFlyInternal(String allocBefore, + String allocAfter) throws Exception { + // Set max running apps + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocBefore); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Request for app 1 + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // App 1 should be running + assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + // App 3 should be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocAfter); + out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should still be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 3 should still be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 1 + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 2 + appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId2, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 3 + appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId3, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should be running now + assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + } + @Test (timeout = 5000) public void testReservationWhileMultiplePriorities() throws IOException { scheduler.init(conf);