From 30d56fdbb40d06c4e267d6c314c8c767a7adc6a3 Mon Sep 17 00:00:00 2001 From: Jian He Date: Tue, 7 Oct 2014 13:43:12 -0700 Subject: [PATCH] YARN-1857. CapacityScheduler headroom doesn't account for other AM's running. Contributed by Chen He and Craig Welch --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/LeafQueue.java | 31 ++-- .../scheduler/capacity/TestLeafQueue.java | 146 +++++++++++++++++- 3 files changed, 168 insertions(+), 12 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 5cb2fc7305a..4bd92415398 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -585,6 +585,9 @@ Release 2.6.0 - UNRELEASED YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when AM allocates. (Craig Welch via jianhe) + YARN-1857. CapacityScheduler headroom doesn't account for other AM's running. + (Chen He and Craig Welch via jianhe) + Release 2.5.1 - 2014-09-05 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 57f0907eb59..cab031816cc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -981,11 +981,28 @@ public class LeafQueue implements CSQueue { private Resource getHeadroom(User user, Resource queueMaxCap, Resource clusterResource, Resource userLimit) { + /** + * Headroom is: + * min( + * min(userLimit, queueMaxCap) - userConsumed, + * queueMaxCap - queueUsedResources + * ) + * + * ( which can be expressed as, + * min (userLimit - userConsumed, queuMaxCap - userConsumed, + * queueMaxCap - queueUsedResources) + * ) + * + * given that queueUsedResources >= userConsumed, this simplifies to + * + * >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) << + * + */ Resource headroom = - Resources.subtract( - Resources.min(resourceCalculator, clusterResource, - userLimit, queueMaxCap), - user.getConsumedResources()); + Resources.min(resourceCalculator, clusterResource, + Resources.subtract(userLimit, user.getConsumedResources()), + Resources.subtract(queueMaxCap, usedResources) + ); return headroom; } @@ -1051,16 +1068,12 @@ public class LeafQueue implements CSQueue { @Lock({LeafQueue.class, FiCaSchedulerApp.class}) - private Resource computeUserLimitAndSetHeadroom( + Resource computeUserLimitAndSetHeadroom( FiCaSchedulerApp application, Resource clusterResource, Resource required) { String user = application.getUser(); User queueUser = getUser(user); - - /** - * Headroom is min((userLimit, queue-max-cap) - consumed) - */ Resource userLimit = // User limit computeUserLimit(application, clusterResource, required, queueUser); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 9e06c524377..92080824a87 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -215,6 +215,7 @@ public class TestLeafQueue { conf.setCapacity(Q_E, 1); conf.setMaximumCapacity(Q_E, 1); conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e"); + } static LeafQueue stubLeafQueue(LeafQueue queue) { @@ -638,7 +639,146 @@ public class TestLeafQueue { assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); } - + + @Test + public void testComputeUserLimitAndSetHeadroom(){ + LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B)); + qb.setMaxCapacity(1.0f); + // Users + final String user_0 = "user_0"; + final String user_1 = "user_1"; + + //create nodes + String host_0 = "127.0.0.1"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB); + String host_1 = "127.0.0.2"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + //our test plan contains three cases + //1. single user dominate the queue, we test the headroom + //2. two users, but user_0 is assigned 100% of the queue resource, + // submit user_1's application, check headroom correctness + //3. two users, each is assigned 50% of the queue resource + // each user submit one application and check their headrooms + //4. similarly to 3. but user_0 has no quote left and there are + // free resources left, check headroom + + //test case 1 + qb.setUserLimit(100); + qb.setUserLimitFactor(1); + + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, qb, + qb.getActiveUsersManager(), rmContext); + qb.submitApplicationAttempt(app_0, user_0); + Priority u0Priority = TestUtils.createMockPriority(1); + app_0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, + u0Priority, recordFactory))); + + assertEquals("There should only be 1 active user!", + 1, qb.getActiveUsersManager().getNumActiveUsers()); + //get headroom + qb.assignContainers(clusterResource, node_0, false); + qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, + app_0.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability()); + + //maxqueue 16G, userlimit 13G, - 4G used = 9G + assertEquals(9*GB,app_0.getHeadroom().getMemory()); + + //test case 2 + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_1, qb, + qb.getActiveUsersManager(), rmContext); + Priority u1Priority = TestUtils.createMockPriority(2); + app_2.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true, + u1Priority, recordFactory))); + qb.submitApplicationAttempt(app_2, user_1); + qb.assignContainers(clusterResource, node_1, false); + qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, + app_0.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability()); + + assertEquals(8*GB, qb.getUsedResources().getMemory()); + assertEquals(4*GB, app_0.getCurrentConsumption().getMemory()); + //maxqueue 16G, userlimit 13G, - 4G used = 9G BUT + //maxqueue 16G - used 8G (4 each app/user) = 8G max headroom (the new logic) + assertEquals(8*GB, app_0.getHeadroom().getMemory()); + assertEquals(4*GB, app_2.getCurrentConsumption().getMemory()); + assertEquals(8*GB, app_2.getHeadroom().getMemory()); + + //test case 3 + qb.finishApplication(app_0.getApplicationId(), user_0); + qb.finishApplication(app_2.getApplicationId(), user_1); + qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority)); + qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority)); + + qb.setUserLimit(50); + qb.setUserLimitFactor(1); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, qb, + qb.getActiveUsersManager(), rmContext); + final ApplicationAttemptId appAttemptId_3 = + TestUtils.getMockApplicationAttemptId(3, 0); + FiCaSchedulerApp app_3 = + new FiCaSchedulerApp(appAttemptId_3, user_1, qb, + qb.getActiveUsersManager(), rmContext); + app_1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + u0Priority, recordFactory))); + app_3.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + u1Priority, recordFactory))); + qb.submitApplicationAttempt(app_1, user_0); + qb.submitApplicationAttempt(app_3, user_1); + qb.assignContainers(clusterResource, node_0, false); + qb.assignContainers(clusterResource, node_0, false); + qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, + app_3.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability()); + assertEquals(4*GB, qb.getUsedResources().getMemory()); + //maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both) + assertEquals(5*GB, app_3.getHeadroom().getMemory()); + assertEquals(5*GB, app_1.getHeadroom().getMemory()); + //test case 4 + final ApplicationAttemptId appAttemptId_4 = + TestUtils.getMockApplicationAttemptId(4, 0); + FiCaSchedulerApp app_4 = + new FiCaSchedulerApp(appAttemptId_4, user_0, qb, + qb.getActiveUsersManager(), rmContext); + qb.submitApplicationAttempt(app_4, user_0); + app_4.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true, + u0Priority, recordFactory))); + qb.assignContainers(clusterResource, node_1, false); + qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, + app_4.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability()); + qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, + app_3.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability()); + + + //app3 is user1, active from last test case + //maxqueue 16G, userlimit 13G, used 2G, would be headroom 10G BUT + //10G in use, so max possible headroom is 6G (new logic) + assertEquals(6*GB, app_3.getHeadroom().getMemory()); + //testcase3 still active - 2+2+6=10 + assertEquals(10*GB, qb.getUsedResources().getMemory()); + //app4 is user 0 + //maxqueue 16G, userlimit 13G, used 8G, headroom 5G + //(8G used is 6G from this test case - app4, 2 from last test case, app_1) + assertEquals(5*GB, app_4.getHeadroom().getMemory()); + } + @Test public void testUserHeadroomMultiApp() throws Exception { // Mock the queue @@ -787,7 +927,7 @@ public class TestLeafQueue { // Set user-limit a.setUserLimit(50); a.setUserLimitFactor(2); - + // Now, only user_0 should be active since he is the only one with // outstanding requests assertEquals("There should only be 1 active user!", @@ -835,7 +975,7 @@ public class TestLeafQueue { priority, recordFactory))); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); a.assignContainers(clusterResource, node_1, false); - assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap + assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap } @Test