diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 751d7124632..fef462a07a3 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -70,6 +70,9 @@ Release 2.7.2 - UNRELEASED YARN-4103. RM WebServices missing scheme for appattempts logLinks. (Jonathan Eagles via vvasudeb) + YARN-4105. Capacity Scheduler headroom for DRF is wrong (Chang Li via + jlowe) + Release 2.7.1 - 2015-07-06 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 59b9d217430..65061ba40e9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -931,7 +931,7 @@ public class LeafQueue extends AbstractCSQueue { * */ Resource headroom = - Resources.min(resourceCalculator, clusterResource, + Resources.componentwiseMin( Resources.subtract(userLimit, user.getUsed()), Resources.subtract(currentResourceLimit, queueUsage.getUsed()) ); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index e2e8cc59ca2..9e01ccb5630 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -2625,7 +2625,117 @@ public class TestCapacityScheduler { rm.stop(); } - + + @Test + public void testHeadRoomCalculationWithDRC() throws Exception { + // test with total cluster resource of 20GB memory and 20 vcores. + // the queue where two apps running has user limit 0.8 + // allocate 10GB memory and 1 vcore to app 1. + // app 1 should have headroom + // 20GB*0.8 - 10GB = 6GB memory available and 15 vcores. + // allocate 1GB memory and 1 vcore to app2. + // app 2 should have headroom 20GB - 10 - 1 = 1GB memory, + // and 20*0.8 - 1 = 15 vcores. + + CapacitySchedulerConfiguration csconf = + new CapacitySchedulerConfiguration(); + csconf.setResourceComparator(DominantResourceCalculator.class); + + YarnConfiguration conf = new YarnConfiguration(csconf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + LeafQueue qb = (LeafQueue)cs.getQueue("default"); + qb.setUserLimitFactor((float)0.8); + + // add app 1 + ApplicationId appId = BuilderUtils.newApplicationId(100, 1); + ApplicationAttemptId appAttemptId = + BuilderUtils.newApplicationAttemptId(appId, 1); + + RMAppAttemptMetrics attemptMetric = + new RMAppAttemptMetrics(appAttemptId, rm.getRMContext()); + RMAppImpl app = mock(RMAppImpl.class); + when(app.getApplicationId()).thenReturn(appId); + RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class); + when(attempt.getAppAttemptId()).thenReturn(appAttemptId); + when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric); + when(app.getCurrentAppAttempt()).thenReturn(attempt); + + rm.getRMContext().getRMApps().put(appId, app); + + SchedulerEvent addAppEvent = + new AppAddedSchedulerEvent(appId, "default", "user1"); + cs.handle(addAppEvent); + SchedulerEvent addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId, false); + cs.handle(addAttemptEvent); + + // add app 2 + ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2); + ApplicationAttemptId appAttemptId2 = + BuilderUtils.newApplicationAttemptId(appId2, 1); + + RMAppAttemptMetrics attemptMetric2 = + new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext()); + RMAppImpl app2 = mock(RMAppImpl.class); + when(app2.getApplicationId()).thenReturn(appId2); + RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class); + when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2); + when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2); + when(app2.getCurrentAppAttempt()).thenReturn(attempt2); + + rm.getRMContext().getRMApps().put(appId2, app2); + addAppEvent = + new AppAddedSchedulerEvent(appId2, "default", "user2"); + cs.handle(addAppEvent); + addAttemptEvent = + new AppAttemptAddedSchedulerEvent(appAttemptId2, false); + cs.handle(addAttemptEvent); + + // add nodes to cluster, so cluster have 20GB and 20 vcores + Resource newResource = Resource.newInstance(10 * GB, 10); + RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1"); + cs.handle(new NodeAddedSchedulerEvent(node)); + + Resource newResource2 = Resource.newInstance(10 * GB, 10); + RMNode node2 = MockNodes.newNodeInfo(0, newResource2, 1, "127.0.0.2"); + cs.handle(new NodeAddedSchedulerEvent(node2)); + + FiCaSchedulerApp fiCaApp1 = + cs.getSchedulerApplications().get(app.getApplicationId()) + .getCurrentAppAttempt(); + + FiCaSchedulerApp fiCaApp2 = + cs.getSchedulerApplications().get(app2.getApplicationId()) + .getCurrentAppAttempt(); + Priority u0Priority = TestUtils.createMockPriority(1); + RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + + // allocate container for app1 with 10GB memory and 1 vcore + fiCaApp1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 10*GB, 1, true, + u0Priority, recordFactory))); + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + assertEquals(6*GB, fiCaApp1.getHeadroom().getMemory()); + assertEquals(15, fiCaApp1.getHeadroom().getVirtualCores()); + + // allocate container for app2 with 1GB memory and 1 vcore + fiCaApp2.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, + u0Priority, recordFactory))); + cs.handle(new NodeUpdateSchedulerEvent(node)); + cs.handle(new NodeUpdateSchedulerEvent(node2)); + assertEquals(9*GB, fiCaApp2.getHeadroom().getMemory()); + assertEquals(15, fiCaApp2.getHeadroom().getVirtualCores()); + } + @Test public void testDefaultNodeLabelExpressionQueueConfig() throws Exception { CapacityScheduler cs = new CapacityScheduler();