YARN-4105. Capacity Scheduler headroom for DRF is wrong. Contributed by Chang Li

(cherry picked from commit 6eaca2e363)
This commit is contained in:
Jason Lowe 2015-09-04 15:30:53 +00:00
parent 5002f885fc
commit 71172a0bfc
3 changed files with 115 additions and 2 deletions

View File

@ -70,6 +70,9 @@ Release 2.7.2 - UNRELEASED
YARN-4103. RM WebServices missing scheme for appattempts logLinks. YARN-4103. RM WebServices missing scheme for appattempts logLinks.
(Jonathan Eagles via vvasudeb) (Jonathan Eagles via vvasudeb)
YARN-4105. Capacity Scheduler headroom for DRF is wrong (Chang Li via
jlowe)
Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -931,7 +931,7 @@ public class LeafQueue extends AbstractCSQueue {
* *
*/ */
Resource headroom = Resource headroom =
Resources.min(resourceCalculator, clusterResource, Resources.componentwiseMin(
Resources.subtract(userLimit, user.getUsed()), Resources.subtract(userLimit, user.getUsed()),
Resources.subtract(currentResourceLimit, queueUsage.getUsed()) Resources.subtract(currentResourceLimit, queueUsage.getUsed())
); );

View File

@ -2626,6 +2626,116 @@ public class TestCapacityScheduler {
rm.stop(); rm.stop();
} }
@Test
public void testHeadRoomCalculationWithDRC() throws Exception {
// test with total cluster resource of 20GB memory and 20 vcores.
// the queue where two apps running has user limit 0.8
// allocate 10GB memory and 1 vcore to app 1.
// app 1 should have headroom
// 20GB*0.8 - 10GB = 6GB memory available and 15 vcores.
// allocate 1GB memory and 1 vcore to app2.
// app 2 should have headroom 20GB - 10 - 1 = 1GB memory,
// and 20*0.8 - 1 = 15 vcores.
CapacitySchedulerConfiguration csconf =
new CapacitySchedulerConfiguration();
csconf.setResourceComparator(DominantResourceCalculator.class);
YarnConfiguration conf = new YarnConfiguration(csconf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue qb = (LeafQueue)cs.getQueue("default");
qb.setUserLimitFactor((float)0.8);
// add app 1
ApplicationId appId = BuilderUtils.newApplicationId(100, 1);
ApplicationAttemptId appAttemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
RMAppAttemptMetrics attemptMetric =
new RMAppAttemptMetrics(appAttemptId, rm.getRMContext());
RMAppImpl app = mock(RMAppImpl.class);
when(app.getApplicationId()).thenReturn(appId);
RMAppAttemptImpl attempt = mock(RMAppAttemptImpl.class);
when(attempt.getAppAttemptId()).thenReturn(appAttemptId);
when(attempt.getRMAppAttemptMetrics()).thenReturn(attemptMetric);
when(app.getCurrentAppAttempt()).thenReturn(attempt);
rm.getRMContext().getRMApps().put(appId, app);
SchedulerEvent addAppEvent =
new AppAddedSchedulerEvent(appId, "default", "user1");
cs.handle(addAppEvent);
SchedulerEvent addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId, false);
cs.handle(addAttemptEvent);
// add app 2
ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2);
ApplicationAttemptId appAttemptId2 =
BuilderUtils.newApplicationAttemptId(appId2, 1);
RMAppAttemptMetrics attemptMetric2 =
new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext());
RMAppImpl app2 = mock(RMAppImpl.class);
when(app2.getApplicationId()).thenReturn(appId2);
RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
rm.getRMContext().getRMApps().put(appId2, app2);
addAppEvent =
new AppAddedSchedulerEvent(appId2, "default", "user2");
cs.handle(addAppEvent);
addAttemptEvent =
new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
cs.handle(addAttemptEvent);
// add nodes to cluster, so cluster have 20GB and 20 vcores
Resource newResource = Resource.newInstance(10 * GB, 10);
RMNode node = MockNodes.newNodeInfo(0, newResource, 1, "127.0.0.1");
cs.handle(new NodeAddedSchedulerEvent(node));
Resource newResource2 = Resource.newInstance(10 * GB, 10);
RMNode node2 = MockNodes.newNodeInfo(0, newResource2, 1, "127.0.0.2");
cs.handle(new NodeAddedSchedulerEvent(node2));
FiCaSchedulerApp fiCaApp1 =
cs.getSchedulerApplications().get(app.getApplicationId())
.getCurrentAppAttempt();
FiCaSchedulerApp fiCaApp2 =
cs.getSchedulerApplications().get(app2.getApplicationId())
.getCurrentAppAttempt();
Priority u0Priority = TestUtils.createMockPriority(1);
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
// allocate container for app1 with 10GB memory and 1 vcore
fiCaApp1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 10*GB, 1, true,
u0Priority, recordFactory)));
cs.handle(new NodeUpdateSchedulerEvent(node));
cs.handle(new NodeUpdateSchedulerEvent(node2));
assertEquals(6*GB, fiCaApp1.getHeadroom().getMemory());
assertEquals(15, fiCaApp1.getHeadroom().getVirtualCores());
// allocate container for app2 with 1GB memory and 1 vcore
fiCaApp2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
u0Priority, recordFactory)));
cs.handle(new NodeUpdateSchedulerEvent(node));
cs.handle(new NodeUpdateSchedulerEvent(node2));
assertEquals(9*GB, fiCaApp2.getHeadroom().getMemory());
assertEquals(15, fiCaApp2.getHeadroom().getVirtualCores());
}
@Test @Test
public void testDefaultNodeLabelExpressionQueueConfig() throws Exception { public void testDefaultNodeLabelExpressionQueueConfig() throws Exception {
CapacityScheduler cs = new CapacityScheduler(); CapacityScheduler cs = new CapacityScheduler();