YARN-1857. CapacityScheduler headroom doesn't account for other AM's running. Contributed by Chen He and Craig Welch
(cherry picked from commit 30d56fdbb4
)
This commit is contained in:
parent
e835cb5c86
commit
5ff984f33f
|
@ -525,6 +525,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when
|
YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when
|
||||||
AM allocates. (Craig Welch via jianhe)
|
AM allocates. (Craig Welch via jianhe)
|
||||||
|
|
||||||
|
YARN-1857. CapacityScheduler headroom doesn't account for other AM's running.
|
||||||
|
(Chen He and Craig Welch via jianhe)
|
||||||
|
|
||||||
Release 2.5.1 - 2014-09-05
|
Release 2.5.1 - 2014-09-05
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -981,11 +981,28 @@ public class LeafQueue implements CSQueue {
|
||||||
|
|
||||||
private Resource getHeadroom(User user, Resource queueMaxCap,
|
private Resource getHeadroom(User user, Resource queueMaxCap,
|
||||||
Resource clusterResource, Resource userLimit) {
|
Resource clusterResource, Resource userLimit) {
|
||||||
|
/**
|
||||||
|
* Headroom is:
|
||||||
|
* min(
|
||||||
|
* min(userLimit, queueMaxCap) - userConsumed,
|
||||||
|
* queueMaxCap - queueUsedResources
|
||||||
|
* )
|
||||||
|
*
|
||||||
|
* ( which can be expressed as,
|
||||||
|
* min (userLimit - userConsumed, queuMaxCap - userConsumed,
|
||||||
|
* queueMaxCap - queueUsedResources)
|
||||||
|
* )
|
||||||
|
*
|
||||||
|
* given that queueUsedResources >= userConsumed, this simplifies to
|
||||||
|
*
|
||||||
|
* >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) <<
|
||||||
|
*
|
||||||
|
*/
|
||||||
Resource headroom =
|
Resource headroom =
|
||||||
Resources.subtract(
|
Resources.min(resourceCalculator, clusterResource,
|
||||||
Resources.min(resourceCalculator, clusterResource,
|
Resources.subtract(userLimit, user.getConsumedResources()),
|
||||||
userLimit, queueMaxCap),
|
Resources.subtract(queueMaxCap, usedResources)
|
||||||
user.getConsumedResources());
|
);
|
||||||
return headroom;
|
return headroom;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1051,17 +1068,13 @@ public class LeafQueue implements CSQueue {
|
||||||
|
|
||||||
|
|
||||||
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
|
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
|
||||||
private Resource computeUserLimitAndSetHeadroom(
|
Resource computeUserLimitAndSetHeadroom(
|
||||||
FiCaSchedulerApp application, Resource clusterResource, Resource required) {
|
FiCaSchedulerApp application, Resource clusterResource, Resource required) {
|
||||||
|
|
||||||
String user = application.getUser();
|
String user = application.getUser();
|
||||||
|
|
||||||
User queueUser = getUser(user);
|
User queueUser = getUser(user);
|
||||||
|
|
||||||
/**
|
|
||||||
* Headroom is min((userLimit, queue-max-cap) - consumed)
|
|
||||||
*/
|
|
||||||
|
|
||||||
Resource userLimit = // User limit
|
Resource userLimit = // User limit
|
||||||
computeUserLimit(application, clusterResource, required, queueUser);
|
computeUserLimit(application, clusterResource, required, queueUser);
|
||||||
|
|
||||||
|
|
|
@ -215,6 +215,7 @@ public class TestLeafQueue {
|
||||||
conf.setCapacity(Q_E, 1);
|
conf.setCapacity(Q_E, 1);
|
||||||
conf.setMaximumCapacity(Q_E, 1);
|
conf.setMaximumCapacity(Q_E, 1);
|
||||||
conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e");
|
conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static LeafQueue stubLeafQueue(LeafQueue queue) {
|
static LeafQueue stubLeafQueue(LeafQueue queue) {
|
||||||
|
@ -639,6 +640,145 @@ public class TestLeafQueue {
|
||||||
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
|
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testComputeUserLimitAndSetHeadroom(){
|
||||||
|
LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B));
|
||||||
|
qb.setMaxCapacity(1.0f);
|
||||||
|
// Users
|
||||||
|
final String user_0 = "user_0";
|
||||||
|
final String user_1 = "user_1";
|
||||||
|
|
||||||
|
//create nodes
|
||||||
|
String host_0 = "127.0.0.1";
|
||||||
|
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
|
||||||
|
String host_1 = "127.0.0.2";
|
||||||
|
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
|
||||||
|
|
||||||
|
final int numNodes = 2;
|
||||||
|
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
|
||||||
|
//our test plan contains three cases
|
||||||
|
//1. single user dominate the queue, we test the headroom
|
||||||
|
//2. two users, but user_0 is assigned 100% of the queue resource,
|
||||||
|
// submit user_1's application, check headroom correctness
|
||||||
|
//3. two users, each is assigned 50% of the queue resource
|
||||||
|
// each user submit one application and check their headrooms
|
||||||
|
//4. similarly to 3. but user_0 has no quote left and there are
|
||||||
|
// free resources left, check headroom
|
||||||
|
|
||||||
|
//test case 1
|
||||||
|
qb.setUserLimit(100);
|
||||||
|
qb.setUserLimitFactor(1);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app_0 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
|
||||||
|
qb.getActiveUsersManager(), rmContext);
|
||||||
|
qb.submitApplicationAttempt(app_0, user_0);
|
||||||
|
Priority u0Priority = TestUtils.createMockPriority(1);
|
||||||
|
app_0.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
|
||||||
|
u0Priority, recordFactory)));
|
||||||
|
|
||||||
|
assertEquals("There should only be 1 active user!",
|
||||||
|
1, qb.getActiveUsersManager().getNumActiveUsers());
|
||||||
|
//get headroom
|
||||||
|
qb.assignContainers(clusterResource, node_0, false);
|
||||||
|
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
|
||||||
|
app_0.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability());
|
||||||
|
|
||||||
|
//maxqueue 16G, userlimit 13G, - 4G used = 9G
|
||||||
|
assertEquals(9*GB,app_0.getHeadroom().getMemory());
|
||||||
|
|
||||||
|
//test case 2
|
||||||
|
final ApplicationAttemptId appAttemptId_2 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(2, 0);
|
||||||
|
FiCaSchedulerApp app_2 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
|
||||||
|
qb.getActiveUsersManager(), rmContext);
|
||||||
|
Priority u1Priority = TestUtils.createMockPriority(2);
|
||||||
|
app_2.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
|
||||||
|
u1Priority, recordFactory)));
|
||||||
|
qb.submitApplicationAttempt(app_2, user_1);
|
||||||
|
qb.assignContainers(clusterResource, node_1, false);
|
||||||
|
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
|
||||||
|
app_0.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability());
|
||||||
|
|
||||||
|
assertEquals(8*GB, qb.getUsedResources().getMemory());
|
||||||
|
assertEquals(4*GB, app_0.getCurrentConsumption().getMemory());
|
||||||
|
//maxqueue 16G, userlimit 13G, - 4G used = 9G BUT
|
||||||
|
//maxqueue 16G - used 8G (4 each app/user) = 8G max headroom (the new logic)
|
||||||
|
assertEquals(8*GB, app_0.getHeadroom().getMemory());
|
||||||
|
assertEquals(4*GB, app_2.getCurrentConsumption().getMemory());
|
||||||
|
assertEquals(8*GB, app_2.getHeadroom().getMemory());
|
||||||
|
|
||||||
|
//test case 3
|
||||||
|
qb.finishApplication(app_0.getApplicationId(), user_0);
|
||||||
|
qb.finishApplication(app_2.getApplicationId(), user_1);
|
||||||
|
qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority));
|
||||||
|
qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority));
|
||||||
|
|
||||||
|
qb.setUserLimit(50);
|
||||||
|
qb.setUserLimitFactor(1);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId_1 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
|
FiCaSchedulerApp app_1 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
|
||||||
|
qb.getActiveUsersManager(), rmContext);
|
||||||
|
final ApplicationAttemptId appAttemptId_3 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(3, 0);
|
||||||
|
FiCaSchedulerApp app_3 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
|
||||||
|
qb.getActiveUsersManager(), rmContext);
|
||||||
|
app_1.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
|
||||||
|
u0Priority, recordFactory)));
|
||||||
|
app_3.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
|
||||||
|
u1Priority, recordFactory)));
|
||||||
|
qb.submitApplicationAttempt(app_1, user_0);
|
||||||
|
qb.submitApplicationAttempt(app_3, user_1);
|
||||||
|
qb.assignContainers(clusterResource, node_0, false);
|
||||||
|
qb.assignContainers(clusterResource, node_0, false);
|
||||||
|
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
|
||||||
|
app_3.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability());
|
||||||
|
assertEquals(4*GB, qb.getUsedResources().getMemory());
|
||||||
|
//maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both)
|
||||||
|
assertEquals(5*GB, app_3.getHeadroom().getMemory());
|
||||||
|
assertEquals(5*GB, app_1.getHeadroom().getMemory());
|
||||||
|
//test case 4
|
||||||
|
final ApplicationAttemptId appAttemptId_4 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(4, 0);
|
||||||
|
FiCaSchedulerApp app_4 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
|
||||||
|
qb.getActiveUsersManager(), rmContext);
|
||||||
|
qb.submitApplicationAttempt(app_4, user_0);
|
||||||
|
app_4.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
|
||||||
|
u0Priority, recordFactory)));
|
||||||
|
qb.assignContainers(clusterResource, node_1, false);
|
||||||
|
qb.computeUserLimitAndSetHeadroom(app_4, clusterResource,
|
||||||
|
app_4.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability());
|
||||||
|
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
|
||||||
|
app_3.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability());
|
||||||
|
|
||||||
|
|
||||||
|
//app3 is user1, active from last test case
|
||||||
|
//maxqueue 16G, userlimit 13G, used 2G, would be headroom 10G BUT
|
||||||
|
//10G in use, so max possible headroom is 6G (new logic)
|
||||||
|
assertEquals(6*GB, app_3.getHeadroom().getMemory());
|
||||||
|
//testcase3 still active - 2+2+6=10
|
||||||
|
assertEquals(10*GB, qb.getUsedResources().getMemory());
|
||||||
|
//app4 is user 0
|
||||||
|
//maxqueue 16G, userlimit 13G, used 8G, headroom 5G
|
||||||
|
//(8G used is 6G from this test case - app4, 2 from last test case, app_1)
|
||||||
|
assertEquals(5*GB, app_4.getHeadroom().getMemory());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUserHeadroomMultiApp() throws Exception {
|
public void testUserHeadroomMultiApp() throws Exception {
|
||||||
// Mock the queue
|
// Mock the queue
|
||||||
|
@ -835,7 +975,7 @@ public class TestLeafQueue {
|
||||||
priority, recordFactory)));
|
priority, recordFactory)));
|
||||||
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
|
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
|
||||||
a.assignContainers(clusterResource, node_1, false);
|
a.assignContainers(clusterResource, node_1, false);
|
||||||
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
|
assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue