YARN-474. Fix CapacityScheduler to trigger application-activation when am-resource-percent configuration is refreshed. Contributed by Zhijie Shen.

svn merge --ignore-ancestry -c 1461402 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1461729 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-03-27 17:26:36 +00:00
parent dd3f9b38de
commit 10ff818f30
3 changed files with 67 additions and 2 deletions

View File

@ -91,6 +91,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-496. Fair scheduler configs are refreshed inconsistently in
reinitialize. (Sandy Ryza via tomwhite)
YARN-474. Fix CapacityScheduler to trigger application-activation when
am-resource-percent configuration is refreshed. (Zhijie Shen via vinodkv)
Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -607,6 +607,10 @@ public synchronized void reinitialize(
newlyParsedLeafQueue.getMaximumActiveApplications(),
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
activateApplications();
}
@Override

View File

@ -138,6 +138,7 @@ public void setUp() throws Exception {
private static final String C = "c";
private static final String C1 = "c1";
private static final String D = "d";
private static final String E = "e";
private void setupQueueConfiguration(
CapacitySchedulerConfiguration conf,
final String newRoot) {
@ -148,7 +149,7 @@ private void setupQueueConfiguration(
conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
conf.setQueues(Q_newRoot, new String[] {A, B, C, D});
conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
conf.setCapacity(Q_newRoot, 100);
conf.setMaximumCapacity(Q_newRoot, 100);
conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
@ -174,10 +175,14 @@ private void setupQueueConfiguration(
conf.setCapacity(Q_C1, 100);
final String Q_D = Q_newRoot + "." + D;
conf.setCapacity(Q_D, 10);
conf.setCapacity(Q_D, 9);
conf.setMaximumCapacity(Q_D, 11);
conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");
final String Q_E = Q_newRoot + "." + E;
conf.setCapacity(Q_E, 1);
conf.setMaximumCapacity(Q_E, 1);
conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e");
}
static LeafQueue stubLeafQueue(LeafQueue queue) {
@ -1567,6 +1572,59 @@ public void testSchedulingConstraints() throws Exception {
}
@Test (timeout = 30000)
public void testActivateApplicationAfterQueueRefresh() throws Exception {
// Manipulate queue 'e'
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
// Users
final String user_e = "user_e";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_e, e,
mock(ActiveUsersManager.class), rmContext);
e.submitApplication(app_0, user_e, E);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_e, e,
mock(ActiveUsersManager.class), rmContext);
e.submitApplication(app_1, user_e, E); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_e, e,
mock(ActiveUsersManager.class), rmContext);
e.submitApplication(app_2, user_e, E); // same user
// before reinitialization
assertEquals(2, e.activeApplications.size());
assertEquals(1, e.pendingApplications.size());
csConf.setDouble(CapacitySchedulerConfiguration
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
CapacitySchedulerConfiguration
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResources());
// after reinitialization
assertEquals(3, e.activeApplications.size());
assertEquals(0, e.pendingApplications.size());
}
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
for (QueueUserACLInfo aclInfo : aclInfos) {
if (aclInfo.getUserAcls().contains(acl)) {