From d0bbff6c32592cb5d49d7be8d8a7346788a9ba19 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 27 Mar 2013 18:38:28 +0000 Subject: [PATCH] YARN-209. Fix CapacityScheduler to trigger application-activation when the cluster capacity changes. Contributed by Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1461773 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 ++ .../scheduler/capacity/LeafQueue.java | 6 ++- .../yarn/server/resourcemanager/TestRM.java | 47 +++++++++++++++++++ .../scheduler/capacity/TestLeafQueue.java | 43 +++++++++++++++++ 4 files changed, 98 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index a5dc6865868..77ae483207d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -149,6 +149,9 @@ Release 2.0.5-beta - UNRELEASED YARN-496. Fair scheduler configs are refreshed inconsistently in reinitialize. (Sandy Ryza via tomwhite) + YARN-209. Fix CapacityScheduler to trigger application-activation when + the cluster capacity changes. (Zhijie Shen via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 1785ec565a6..efc6e3efb25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -1481,7 +1481,11 @@ public class LeafQueue implements CSQueue { CSQueueUtils.updateQueueStatistics( resourceCalculator, this, getParent(), clusterResource, minimumAllocation); - + + // queue metrics are updated, more resource may be available + // activate the pending applications if possible + activateApplications(); + // Update application properties for (FiCaSchedulerApp application : activeApplications) { synchronized (application) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 1a161268474..939443bc5ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -26,10 +26,12 @@ import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; @@ -135,6 +137,51 @@ public class TestRM { rm.stop(); } + @Test (timeout = 30000) + public void testActivatingApplicationAfterAddingNM() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + + MockRM rm1 = new MockRM(conf); + + // start like normal because state is empty + rm1.start(); + + // app that gets launched + RMApp app1 = rm1.submitApp(200); + + // app that does not get launched + RMApp app2 = rm1.submitApp(200); + + // app1 and app2 should be scheduled, but because no resource is available, + // they are not activated. + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + ApplicationAttemptId attemptId1 = attempt1.getAppAttemptId(); + rm1.waitForState(attemptId1, RMAppAttemptState.SCHEDULED); + RMAppAttempt attempt2 = app2.getCurrentAppAttempt(); + ApplicationAttemptId attemptId2 = attempt2.getAppAttemptId(); + rm1.waitForState(attemptId2, RMAppAttemptState.SCHEDULED); + + MockNM nm1 = new MockNM("h1:1234", 15120, rm1.getResourceTrackerService()); + MockNM nm2 = new MockNM("h2:5678", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + nm2.registerNode(); + + //kick the scheduling + nm1.nodeHeartbeat(true); + + // app1 should be allocated now + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + rm1.waitForState(attemptId2, RMAppAttemptState.SCHEDULED); + + nm2.nodeHeartbeat(true); + + // app2 should be allocated now + rm1.waitForState(attemptId1, RMAppAttemptState.ALLOCATED); + rm1.waitForState(attemptId2, RMAppAttemptState.ALLOCATED); + + rm1.stop(); + } + public static void main(String[] args) throws Exception { TestRM t = new TestRM(); t.testGetNewAppId(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 0460b3f6ad3..828c796d011 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1625,6 +1625,49 @@ public class TestLeafQueue { assertEquals(0, e.pendingApplications.size()); } + @Test (timeout = 30000) + public void testActivateApplicationByUpdatingClusterResource() + throws Exception { + + // Manipulate queue 'e' + LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E)); + + // Users + final String user_e = "user_e"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_0, user_e, E); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_1, user_e, E); // same user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_2, user_e, E); // same user + + // before updating cluster resource + assertEquals(2, e.activeApplications.size()); + assertEquals(1, e.pendingApplications.size()); + + e.updateClusterResource(Resources.createResource(200 * 16 * GB, 100 * 32)); + + // after updating cluster resource + assertEquals(3, e.activeApplications.size()); + assertEquals(0, e.pendingApplications.size()); + } + public boolean hasQueueACL(List aclInfos, QueueACL acl) { for (QueueUserACLInfo aclInfo : aclInfos) { if (aclInfo.getUserAcls().contains(acl)) {