YARN-3101. In Fair Scheduler, fix canceling of reservations for exceeding max share (Anubhav Dhoot via Sandy Ryza)

This commit is contained in:
Sandy Ryza 2015-02-05 09:35:47 -08:00
parent 2dc4af2b93
commit 84df660af4
4 changed files with 109 additions and 25 deletions

View File

@ -448,6 +448,9 @@ Release 2.7.0 - UNRELEASED
YARN-3058. Fix error message of tokens' activation delay configuration. YARN-3058. Fix error message of tokens' activation delay configuration.
(Yi Liu via ozawa) (Yi Liu via ozawa)
YARN-3101. In Fair Scheduler, fix canceling of reservations for exceeding
max share (Anubhav Dhoot via Sandy Ryza)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -530,6 +530,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return container.getResource(); return container.getResource();
} else { } else {
if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) {
return Resources.none();
}
// The desired container won't fit here, so reserve // The desired container won't fit here, so reserve
reserve(request.getPriority(), node, container, reserved); reserve(request.getPriority(), node, container, reserved);

View File

@ -1049,7 +1049,8 @@ public class FairScheduler extends
FSQueue queue = reservedAppSchedulable.getQueue(); FSQueue queue = reservedAppSchedulable.getQueue();
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node) if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
|| !fitInMaxShare(queue)) { || !fitsInMaxShare(queue,
node.getReservedContainer().getReservedResource())) {
// Don't hold the reservation if app can no longer use it // Don't hold the reservation if app can no longer use it
LOG.info("Releasing reservation that cannot be satisfied for application " LOG.info("Releasing reservation that cannot be satisfied for application "
+ reservedAppSchedulable.getApplicationAttemptId() + reservedAppSchedulable.getApplicationAttemptId()
@ -1084,14 +1085,18 @@ public class FairScheduler extends
updateRootQueueMetrics(); updateRootQueueMetrics();
} }
private boolean fitInMaxShare(FSQueue queue) { static boolean fitsInMaxShare(FSQueue queue, Resource
if (Resources.fitsIn(queue.getResourceUsage(), queue.getMaxShare())) { additionalResource) {
Resource usagePlusAddition =
Resources.add(queue.getResourceUsage(), additionalResource);
if (!Resources.fitsIn(usagePlusAddition, queue.getMaxShare())) {
return false; return false;
} }
FSQueue parentQueue = queue.getParent(); FSQueue parentQueue = queue.getParent();
if (parentQueue != null) { if (parentQueue != null) {
return fitInMaxShare(parentQueue); return fitsInMaxShare(parentQueue, additionalResource);
} }
return true; return true;
} }

View File

@ -784,19 +784,18 @@ public class TestFairScheduler extends FairSchedulerTestBase {
} }
@Test (timeout = 5000) @Test (timeout = 500000)
public void testContainerReservationNotExceedingQueueMax() throws Exception { public void testContainerReservationAttemptExceedingQueueMax()
throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>"); out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>"); out.println("<allocations>");
out.println("<queue name=\"root\">"); out.println("<queue name=\"root\">");
out.println("<queue name=\"queue1\">"); out.println("<queue name=\"queue1\">");
out.println("<minResources>1024mb,5vcores</minResources>"); out.println("<maxResources>2048mb,5vcores</maxResources>");
out.println("<maxResources>2048mb,10vcores</maxResources>");
out.println("</queue>"); out.println("</queue>");
out.println("<queue name=\"queue2\">"); out.println("<queue name=\"queue2\">");
out.println("<minResources>1024mb,5vcores</minResources>");
out.println("<maxResources>2048mb,10vcores</maxResources>"); out.println("<maxResources>2048mb,10vcores</maxResources>");
out.println("</queue>"); out.println("</queue>");
out.println("</queue>"); out.println("</queue>");
@ -825,7 +824,64 @@ public class TestFairScheduler extends FairSchedulerTestBase {
getResourceUsage().getMemory()); getResourceUsage().getMemory());
// Now queue 2 requests likewise // Now queue 2 requests likewise
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user2", 1); createSchedulingRequest(1024, "queue2", "user2", 1);
scheduler.update();
scheduler.handle(updateEvent);
// Make sure queue 2 is allocated app capacity
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
getResourceUsage().getMemory());
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
scheduler.update();
scheduler.handle(updateEvent);
// Ensure the reservation does not get created as allocated memory of
// queue1 exceeds max
assertEquals(0, scheduler.getSchedulerApp(attId1).
getCurrentReservation().getMemory());
}
@Test (timeout = 500000)
public void testContainerReservationNotExceedingQueueMax() throws Exception {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"root\">");
out.println("<queue name=\"queue1\">");
out.println("<maxResources>3072mb,10vcores</maxResources>");
out.println("</queue>");
out.println("<queue name=\"queue2\">");
out.println("<maxResources>2048mb,10vcores</maxResources>");
out.println("</queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(3072, 5), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Queue 1 requests full capacity of the queue
createSchedulingRequest(2048, "queue1", "user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
// Make sure queue 1 is allocated app capacity
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory());
// Now queue 2 requests likewise
createSchedulingRequest(1024, "queue2", "user2", 1);
scheduler.update(); scheduler.update();
scheduler.handle(updateEvent); scheduler.handle(updateEvent);
@ -841,18 +897,34 @@ public class TestFairScheduler extends FairSchedulerTestBase {
assertEquals(1024, scheduler.getSchedulerApp(attId1) assertEquals(1024, scheduler.getSchedulerApp(attId1)
.getCurrentReservation().getMemory()); .getCurrentReservation().getMemory());
// Now remove app of queue2 // Exercise checks that reservation fits
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( scheduler.handle(updateEvent);
attId, RMAppAttemptState.FINISHED, false);
scheduler.update();
scheduler.handle(appRemovedEvent1);
// Queue should have no apps // Ensure the reservation still exists as allocated memory of queue1 doesn't
assertEquals(0, scheduler.getQueueManager().getQueue("queue2"). // exceed max
getResourceUsage().getMemory()); assertEquals(1024, scheduler.getSchedulerApp(attId1).
getCurrentReservation().getMemory());
// Now reduce max Resources of queue1 down to 2048
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"root\">");
out.println("<queue name=\"queue1\">");
out.println("<maxResources>2048mb,10vcores</maxResources>");
out.println("</queue>");
out.println("<queue name=\"queue2\">");
out.println("<maxResources>2048mb,10vcores</maxResources>");
out.println("</queue>");
out.println("</queue>");
out.println("</allocations>");
out.close();
scheduler.reinitialize(conf, resourceManager.getRMContext());
createSchedulingRequest(1024, "queue2", "user2", 1); createSchedulingRequest(1024, "queue2", "user2", 1);
scheduler.handle(updateEvent); scheduler.handle(updateEvent);
// Make sure allocated memory of queue1 doesn't exceed its maximum // Make sure allocated memory of queue1 doesn't exceed its maximum
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1"). assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory()); getResourceUsage().getMemory());
@ -2257,10 +2329,9 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.handle(updateEvent); scheduler.handle(updateEvent);
assertEquals(1, app.getLiveContainers().size()); assertEquals(1, app.getLiveContainers().size());
// Reserved container should will be at higher priority, // Reserved container should still be at lower priority
// since old reservation cannot be satisfied
for (RMContainer container : app.getReservedContainers()) { for (RMContainer container : app.getReservedContainers()) {
assertEquals(1, container.getReservedPriority().getPriority()); assertEquals(2, container.getReservedPriority().getPriority());
} }
// Complete container // Complete container
@ -2273,11 +2344,12 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.update(); scheduler.update();
scheduler.handle(updateEvent); scheduler.handle(updateEvent);
// Reserved container (at higher priority) should be run // Reserved container (at lower priority) should be run
Collection<RMContainer> liveContainers = app.getLiveContainers(); Collection<RMContainer> liveContainers = app.getLiveContainers();
assertEquals(1, liveContainers.size()); assertEquals(1, liveContainers.size());
for (RMContainer liveContainer : liveContainers) { for (RMContainer liveContainer : liveContainers) {
Assert.assertEquals(1, liveContainer.getContainer().getPriority().getPriority()); Assert.assertEquals(2, liveContainer.getContainer().getPriority()
.getPriority());
} }
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores()); assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());