YARN-6439. Fix ReservationSystem creation of default ReservationQueue. (Carlo Curino via wangda)

(cherry picked from commit 4d4ad0ebb7)
This commit is contained in:
Wangda Tan 2017-04-11 14:56:18 -07:00
parent b04c09163b
commit 2f08c86b7a
6 changed files with 52 additions and 3 deletions

View File

@ -40,10 +40,13 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.Permission;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import com.google.common.annotations.VisibleForTesting;
@ -219,6 +222,23 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
queue =
new PlanQueue(csContext, queueName, parent,
oldQueues.get(queueName));
//initializing the "internal" default queue, for SLS compatibility
String defReservationId =
queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
List<CSQueue> childQueues = new ArrayList<>();
ReservationQueue resQueue = new ReservationQueue(csContext,
defReservationId, (PlanQueue) queue);
try {
resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f));
} catch (SchedulerDynamicEditException e) {
throw new IllegalStateException(e);
}
childQueues.add(resQueue);
((PlanQueue) queue).setChildQueues(childQueues);
queues.put(defReservationId, resQueue);
} else {
queue =
new LeafQueue(csContext, queueName, parent,

View File

@ -93,10 +93,10 @@ public class PlanQueue extends ParentQueue {
PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
if (newlyParsedParentQueue.getChildQueues().size() > 0) {
if (newlyParsedParentQueue.getChildQueues().size() != 1) {
throw new IOException(
"Reservable Queue should not have sub-queues in the"
+ "configuration");
+ "configuration expect the default reservation queue");
}
// Set new configs

View File

@ -153,6 +153,13 @@ public class TestCapacitySchedulerPlanFollower extends
assertTrue(csQueue.getCapacity() > 0.9);
}
@Override
protected void checkDefaultQueueBeforePlanFollowerRun(){
Queue defQ = getDefaultQueue();
Assert.assertEquals(0, getNumberOfApplications(defQ));
Assert.assertNotNull(defQ);
}
@Override
protected Queue getDefaultQueue() {
return cs.getQueue("dedicated" + ReservationConstants.DEFAULT_QUEUE_SUFFIX);

View File

@ -131,6 +131,10 @@ public class TestFairSchedulerPlanFollower extends
testPlanFollower(false);
}
@Override
protected void checkDefaultQueueBeforePlanFollowerRun() {
Assert.assertNull(getDefaultQueue());
}
@Override
protected void verifyCapacity(Queue defQ) {
assertTrue(((FSQueue) defQ).getWeights().getWeight(ResourceType.MEMORY) > 0.9);

View File

@ -89,6 +89,11 @@ public abstract class TestSchedulerPlanFollowerBase {
"dedicated", 10, 10 + f2.length, ReservationSystemTestUtil
.generateAllocation(10L, 1L, f2), res, minAlloc), false));
// default reseration queue should exist before run of PlanFollower AND have
// no apps
checkDefaultQueueBeforePlanFollowerRun();
AbstractSchedulerPlanFollower planFollower = createPlanFollower();
when(mClock.getTime()).thenReturn(0L);
@ -108,8 +113,8 @@ public abstract class TestSchedulerPlanFollowerBase {
new AppAttemptAddedSchedulerEvent(appAttemptId_0, false);
scheduler.handle(appAttemptAddedEvent);
// initial default reservation queue should have no apps
// initial default reservation queue should have no apps after first run
Queue defQ = getDefaultQueue();
Assert.assertEquals(0, getNumberOfApplications(defQ));
@ -179,6 +184,8 @@ public abstract class TestSchedulerPlanFollowerBase {
verifyCapacity(defQ);
}
protected abstract void checkDefaultQueueBeforePlanFollowerRun();
protected abstract Queue getReservationQueue(String reservationId);
protected abstract void verifyCapacity(Queue defQ);

View File

@ -76,6 +76,12 @@ public class TestCapacitySchedulerDynamicBehavior {
public void testRefreshQueuesWithReservations() throws Exception {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
//set default queue capacity to zero
((ReservationQueue) cs
.getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX))
.setEntitlement(
new QueueEntitlement(0f, 1f));
// Test add one reservation dynamically and manually modify capacity
ReservationQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
@ -120,6 +126,11 @@ public class TestCapacitySchedulerDynamicBehavior {
ReservationQueue a1 =
new ReservationQueue(cs, "a1", (PlanQueue) cs.getQueue("a"));
cs.addQueue(a1);
//set default queue capacity to zero
((ReservationQueue) cs
.getQueue("a" + ReservationConstants.DEFAULT_QUEUE_SUFFIX))
.setEntitlement(
new QueueEntitlement(0f, 1f));
a1.setEntitlement(new QueueEntitlement(A1_CAPACITY / 100, 1f));
// Test add another reservation queue and use setEntitlement to modify