YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max share (Siqi Li via Sandy Ryza)
This commit is contained in:
parent
ad140d1fc8
commit
c4c77669f0
|
@ -198,6 +198,9 @@ Release 2.6.0 - 2014-11-18
|
|||
YARN-2505. Supported get/add/remove/change labels in RM REST API. (Craig Welch
|
||||
via zjshen)
|
||||
|
||||
YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max
|
||||
share (Siqi Li via Sandy Ryza)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
YARN-2242. Improve exception information on AM launch crashes. (Li Lu
|
||||
|
|
|
@ -1029,7 +1029,10 @@ public class FairScheduler extends
|
|||
FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
|
||||
if (reservedAppSchedulable != null) {
|
||||
Priority reservedPriority = node.getReservedContainer().getReservedPriority();
|
||||
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)) {
|
||||
FSQueue queue = reservedAppSchedulable.getQueue();
|
||||
|
||||
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
|
||||
|| !fitInMaxShare(queue)) {
|
||||
// Don't hold the reservation if app can no longer use it
|
||||
LOG.info("Releasing reservation that cannot be satisfied for application "
|
||||
+ reservedAppSchedulable.getApplicationAttemptId()
|
||||
|
@ -1043,7 +1046,6 @@ public class FairScheduler extends
|
|||
+ reservedAppSchedulable.getApplicationAttemptId()
|
||||
+ " on node: " + node);
|
||||
}
|
||||
|
||||
node.getReservedAppSchedulable().assignReservedContainer(node);
|
||||
}
|
||||
}
|
||||
|
@ -1065,6 +1067,18 @@ public class FairScheduler extends
|
|||
updateRootQueueMetrics();
|
||||
}
|
||||
|
||||
private boolean fitInMaxShare(FSQueue queue) {
|
||||
if (Resources.fitsIn(queue.getResourceUsage(), queue.getMaxShare())) {
|
||||
return false;
|
||||
}
|
||||
|
||||
FSQueue parentQueue = queue.getParent();
|
||||
if (parentQueue != null) {
|
||||
return fitInMaxShare(parentQueue);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public FSAppAttempt getSchedulerApp(ApplicationAttemptId appAttemptId) {
|
||||
return super.getApplicationAttempt(appAttemptId);
|
||||
}
|
||||
|
|
|
@ -722,6 +722,85 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
|
||||
}
|
||||
|
||||
@Test (timeout = 5000)
|
||||
public void testContainerReservationNotExceedingQueueMax() throws Exception {
|
||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||
out.println("<?xml version=\"1.0\"?>");
|
||||
out.println("<allocations>");
|
||||
out.println("<queue name=\"root\">");
|
||||
out.println("<queue name=\"queue1\">");
|
||||
out.println("<minResources>1024mb,5vcores</minResources>");
|
||||
out.println("<maxResources>2048mb,10vcores</maxResources>");
|
||||
out.println("</queue>");
|
||||
out.println("<queue name=\"queue2\">");
|
||||
out.println("<minResources>1024mb,5vcores</minResources>");
|
||||
out.println("<maxResources>2048mb,10vcores</maxResources>");
|
||||
out.println("</queue>");
|
||||
out.println("</queue>");
|
||||
out.println("</allocations>");
|
||||
out.close();
|
||||
|
||||
scheduler.init(conf);
|
||||
scheduler.start();
|
||||
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||
|
||||
// Add a node
|
||||
RMNode node1 =
|
||||
MockNodes
|
||||
.newNodeInfo(1, Resources.createResource(3072, 5), 1, "127.0.0.1");
|
||||
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||
scheduler.handle(nodeEvent1);
|
||||
|
||||
// Queue 1 requests full capacity of the queue
|
||||
createSchedulingRequest(2048, "queue1", "user1", 1);
|
||||
scheduler.update();
|
||||
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||
scheduler.handle(updateEvent);
|
||||
|
||||
// Make sure queue 1 is allocated app capacity
|
||||
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
|
||||
getResourceUsage().getMemory());
|
||||
|
||||
// Now queue 2 requests likewise
|
||||
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user2", 1);
|
||||
scheduler.update();
|
||||
scheduler.handle(updateEvent);
|
||||
|
||||
// Make sure queue 2 is allocated app capacity
|
||||
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
|
||||
getResourceUsage().getMemory());
|
||||
|
||||
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
|
||||
scheduler.update();
|
||||
scheduler.handle(updateEvent);
|
||||
|
||||
// Make sure queue 1 is waiting with a reservation
|
||||
assertEquals(1024, scheduler.getSchedulerApp(attId1)
|
||||
.getCurrentReservation().getMemory());
|
||||
|
||||
// Now remove app of queue2
|
||||
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
|
||||
attId, RMAppAttemptState.FINISHED, false);
|
||||
scheduler.update();
|
||||
scheduler.handle(appRemovedEvent1);
|
||||
|
||||
// Queue should have no apps
|
||||
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
|
||||
getResourceUsage().getMemory());
|
||||
|
||||
createSchedulingRequest(1024, "queue2", "user2", 1);
|
||||
scheduler.handle(updateEvent);
|
||||
// Make sure allocated memory of queue1 doesn't exceed its maximum
|
||||
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
|
||||
getResourceUsage().getMemory());
|
||||
//the reservation of queue1 should be reclaim
|
||||
assertEquals(0, scheduler.getSchedulerApp(attId1).
|
||||
getCurrentReservation().getMemory());
|
||||
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
|
||||
getResourceUsage().getMemory());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserAsDefaultQueue() throws Exception {
|
||||
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true");
|
||||
|
@ -2076,9 +2155,10 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
scheduler.handle(updateEvent);
|
||||
|
||||
assertEquals(1, app.getLiveContainers().size());
|
||||
// Reserved container should still be at lower priority
|
||||
// Reserved container should will be at higher priority,
|
||||
// since old reservation cannot be satisfied
|
||||
for (RMContainer container : app.getReservedContainers()) {
|
||||
assertEquals(2, container.getReservedPriority().getPriority());
|
||||
assertEquals(1, container.getReservedPriority().getPriority());
|
||||
}
|
||||
|
||||
// Complete container
|
||||
|
@ -2091,11 +2171,11 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|||
scheduler.update();
|
||||
scheduler.handle(updateEvent);
|
||||
|
||||
// Reserved container (at lower priority) should be run
|
||||
// Reserved container (at higher priority) should be run
|
||||
Collection<RMContainer> liveContainers = app.getLiveContainers();
|
||||
assertEquals(1, liveContainers.size());
|
||||
for (RMContainer liveContainer : liveContainers) {
|
||||
Assert.assertEquals(2, liveContainer.getContainer().getPriority().getPriority());
|
||||
Assert.assertEquals(1, liveContainer.getContainer().getPriority().getPriority());
|
||||
}
|
||||
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
|
||||
|
|
Loading…
Reference in New Issue