YARN-4606. CapacityScheduler: applications could get starved because computation of #activeUsers considers pending apps. Contributed by Manikandan R

(cherry picked from commit 9485c9aee6)
This commit is contained in:
Eric E Payne 2018-07-25 16:22:04 +00:00
parent d2212c20c5
commit 830ef12af8
3 changed files with 197 additions and 1 deletions

View File

@ -85,6 +85,7 @@ public class UsersManager implements AbstractUsersManager {
private final QueueMetrics metrics;
private AtomicInteger activeUsers = new AtomicInteger(0);
private AtomicInteger activeUsersWithOnlyPendingApps = new AtomicInteger(0);
private Map<String, Set<ApplicationId>> usersApplications =
new HashMap<String, Set<ApplicationId>>();
@ -671,9 +672,23 @@ public class UsersManager implements AbstractUsersManager {
// update in local storage
userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit);
computeNumActiveUsersWithOnlyPendingApps();
return userLimitPerSchedulingMode;
}
// This method is called within the lock.
private void computeNumActiveUsersWithOnlyPendingApps() {
int numPendingUsers = 0;
for (User user : users.values()) {
if ((user.getPendingApplications() > 0)
&& (user.getActiveApplications() <= 0)) {
numPendingUsers++;
}
}
activeUsersWithOnlyPendingApps = new AtomicInteger(numPendingUsers);
}
private Resource computeUserLimit(String userName, Resource clusterResource,
String nodePartition, SchedulingMode schedulingMode, boolean activeUser) {
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
@ -839,6 +854,11 @@ public class UsersManager implements AbstractUsersManager {
try {
this.writeLock.lock();
User userDesc = getUser(user);
if (userDesc != null && userDesc.getActiveApplications() <= 0) {
return;
}
Set<ApplicationId> userApps = usersApplications.get(user);
if (userApps == null) {
userApps = new HashSet<ApplicationId>();
@ -893,7 +913,7 @@ public class UsersManager implements AbstractUsersManager {
@Override
public int getNumActiveUsers() {
return activeUsers.get();
return activeUsers.get() + activeUsersWithOnlyPendingApps.get();
}
float sumActiveUsersTimesWeights() {
@ -1090,4 +1110,9 @@ public class UsersManager implements AbstractUsersManager {
this.writeLock.unlock();
}
}
@VisibleForTesting
public int getNumActiveUsersWithOnlyPendingApps() {
return activeUsersWithOnlyPendingApps.get();
}
}

View File

@ -4978,4 +4978,132 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
Assert.assertEquals(AllocationState.QUEUE_SKIPPED,
ContainerAllocation.QUEUE_SKIPPED.getAllocationState());
}
@Test
public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
// Define top-level queues
newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
newConf.setCapacity(A, 50);
newConf.setCapacity(B, 50);
// Define 2nd-level queues
newConf.setQueues(A, new String[] { "a1" });
newConf.setCapacity(A1, 100);
newConf.setUserLimitFactor(A1, 2.0f);
newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f);
newConf.setQueues(B, new String[] { "b1" });
newConf.setCapacity(B1, 100);
newConf.setUserLimitFactor(B1, 2.0f);
LOG.info("Setup top-level queues a and b");
MockRM rm = new MockRM(newConf);
rm.start();
CapacityScheduler scheduler =
(CapacityScheduler) rm.getResourceScheduler();
MockNM nm1 = rm.registerNode("h1:1234", 16 * GB);
// submit an app
RMApp app = rm.submitApp(GB, "test-move-1", "u1", null, "a1");
MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
ApplicationAttemptId appAttemptId =
rm.getApplicationReport(app.getApplicationId())
.getCurrentApplicationAttemptId();
RMApp app2 = rm.submitApp(1 * GB, "app", "u2", null, "a1");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
RMApp app3 = rm.submitApp(1 * GB, "app", "u3", null, "a1");
RMApp app4 = rm.submitApp(1 * GB, "app", "u4", null, "a1");
// Each application asks 50 * 1GB containers
am1.allocate("*", 1 * GB, 50, null);
am2.allocate("*", 1 * GB, 50, null);
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
// check preconditions
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
assertEquals(4, appsInA1.size());
String queue =
scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue()
.getQueueName();
Assert.assertEquals("a1", queue);
List<ApplicationAttemptId> appsInA = scheduler.getAppsInQueue("a");
assertTrue(appsInA.contains(appAttemptId));
assertEquals(4, appsInA.size());
List<ApplicationAttemptId> appsInRoot = scheduler.getAppsInQueue("root");
assertTrue(appsInRoot.contains(appAttemptId));
assertEquals(4, appsInRoot.size());
List<ApplicationAttemptId> appsInB1 = scheduler.getAppsInQueue("b1");
assertTrue(appsInB1.isEmpty());
List<ApplicationAttemptId> appsInB = scheduler.getAppsInQueue("b");
assertTrue(appsInB.isEmpty());
UsersManager um =
(UsersManager) scheduler.getQueue("a1").getAbstractUsersManager();
assertEquals(4, um.getNumActiveUsers());
assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
// now move the app
scheduler.moveAllApps("a1", "b1");
//Triggering this event so that user limit computation can
//happen again
for (int i = 0; i < 10; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Thread.sleep(500);
}
// check postconditions
appsInB1 = scheduler.getAppsInQueue("b1");
assertEquals(4, appsInB1.size());
queue =
scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue()
.getQueueName();
Assert.assertEquals("b1", queue);
appsInB = scheduler.getAppsInQueue("b");
assertTrue(appsInB.contains(appAttemptId));
assertEquals(4, appsInB.size());
appsInRoot = scheduler.getAppsInQueue("root");
assertTrue(appsInRoot.contains(appAttemptId));
assertEquals(4, appsInRoot.size());
List<ApplicationAttemptId> oldAppsInA1 = scheduler.getAppsInQueue("a1");
assertEquals(0, oldAppsInA1.size());
UsersManager um_b1 =
(UsersManager) scheduler.getQueue("b1").getAbstractUsersManager();
assertEquals(2, um_b1.getNumActiveUsers());
assertEquals(2, um_b1.getNumActiveUsersWithOnlyPendingApps());
appsInB1 = scheduler.getAppsInQueue("b1");
assertEquals(4, appsInB1.size());
rm.close();
}
}

View File

@ -941,4 +941,47 @@ public class TestContainerAllocation {
rm1.close();
}
@Test
public void testActiveUsersWithOnlyPendingApps() throws Exception {
CapacitySchedulerConfiguration newConf =
new CapacitySchedulerConfiguration(conf);
newConf.setMaximumAMResourcePercentPerPartition(
CapacitySchedulerConfiguration.ROOT + ".default", "", 0.2f);
MockRM rm1 = new MockRM(newConf);
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
RMApp app1 = rm1.submitApp(1 * GB, "app", "u1", null, "default");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
RMApp app2 = rm1.submitApp(1 * GB, "app", "u2", null, "default");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
RMApp app3 = rm1.submitApp(1 * GB, "app", "u3", null, "default");
RMApp app4 = rm1.submitApp(1 * GB, "app", "u4", null, "default");
// Each application asks 50 * 1GB containers
am1.allocate("*", 1 * GB, 50, null);
am2.allocate("*", 1 * GB, 50, null);
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
for (int i = 0; i < 10; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
Thread.sleep(1000);
}
LeafQueue lq = (LeafQueue) cs.getQueue("default");
UsersManager um = (UsersManager) lq.getAbstractUsersManager();
Assert.assertEquals(4, um.getNumActiveUsers());
Assert.assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps());
Assert.assertEquals(2, lq.getMetrics().getAppsPending());
rm1.close();
}
}