YARN-3275. CapacityScheduler: Preemption happening on non-preemptable queues. Contributed by Eric Payne

(cherry picked from commit 27e8ea820f)
This commit is contained in:
Jason Lowe 2015-03-06 22:36:18 +00:00
parent ab251fd355
commit ef3d9bdf6b
4 changed files with 54 additions and 5 deletions

View File

@ -677,6 +677,9 @@ Release 2.7.0 - UNRELEASED
YARN-3227. Timeline renew delegation token fails when RM user's TGT is expired
(Zhijie Shen via xgong)
YARN-3275. CapacityScheduler: Preemption happening on non-preemptable
queues (Eric Payne via jlowe)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -260,4 +260,9 @@ public class Resources {
return createResource(Math.min(lhs.getMemory(), rhs.getMemory()),
Math.min(lhs.getVirtualCores(), rhs.getVirtualCores()));
}
public static Resource componentwiseMax(Resource lhs, Resource rhs) {
return createResource(Math.max(lhs.getMemory(), rhs.getMemory()),
Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
}
}

View File

@ -527,6 +527,17 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
for (TempQueue qT : queues) {
if (qT.preemptionDisabled && qT.leafQueue != null) {
if (LOG.isDebugEnabled()) {
if (Resources.greaterThan(rc, clusterResource,
qT.toBePreempted, Resource.newInstance(0, 0))) {
LOG.debug("Tried to preempt the following "
+ "resources from non-preemptable queue: "
+ qT.queueName + " - Resources: " + qT.toBePreempted);
}
}
continue;
}
// we act only if we are violating balance by more than
// maxIgnoredOverCapacity
if (Resources.greaterThan(rc, clusterResource, qT.current,
@ -734,6 +745,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
float absUsed = root.getAbsoluteUsedCapacity();
float absCap = root.getAbsoluteCapacity();
float absMaxCap = root.getAbsoluteMaximumCapacity();
boolean preemptionDisabled = root.getPreemptionDisabled();
Resource current = Resources.multiply(clusterResources, absUsed);
Resource guaranteed = Resources.multiply(clusterResources, absCap);
@ -747,8 +759,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
LeafQueue l = (LeafQueue) root;
Resource pending = l.getTotalResourcePending();
ret = new TempQueue(queueName, current, pending, guaranteed,
maxCapacity);
if (root.getPreemptionDisabled()) {
maxCapacity, preemptionDisabled);
if (preemptionDisabled) {
ret.untouchableExtra = extra;
} else {
ret.preemptableExtra = extra;
@ -757,7 +769,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
} else {
Resource pending = Resource.newInstance(0, 0);
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
maxCapacity);
maxCapacity, false);
Resource childrensPreemptable = Resource.newInstance(0, 0);
for (CSQueue c : root.getChildQueues()) {
TempQueue subq = cloneQueues(c, clusterResources);
@ -816,9 +828,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
final ArrayList<TempQueue> children;
LeafQueue leafQueue;
boolean preemptionDisabled;
TempQueue(String queueName, Resource current, Resource pending,
Resource guaranteed, Resource maxCapacity) {
Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled) {
this.queueName = queueName;
this.current = current;
this.pending = pending;
@ -831,6 +844,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
this.children = new ArrayList<TempQueue>();
this.untouchableExtra = Resource.newInstance(0, 0);
this.preemptableExtra = Resource.newInstance(0, 0);
this.preemptionDisabled = preemptionDisabled;
}
public void setLeafQueue(LeafQueue l){
@ -862,10 +876,13 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// the unused ones
Resource offer(Resource avail, ResourceCalculator rc,
Resource clusterResource) {
Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
Resources.subtract(maxCapacity, idealAssigned),
Resource.newInstance(0, 0));
// remain = avail - min(avail, (max - assigned), (current + pending - assigned))
Resource accepted =
Resources.min(rc, clusterResource,
Resources.subtract(maxCapacity, idealAssigned),
absMaxCapIdealAssignedDelta,
Resources.min(rc, clusterResource, avail, Resources.subtract(
Resources.add(current, pending), idealAssigned)));
Resource remain = Resources.subtract(avail, accepted);

View File

@ -531,6 +531,30 @@ public class TestProportionalCapacityPreemptionPolicy {
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
}
@Test
public void testPerQueueDisablePreemptionOverAbsMaxCapacity() {
int[][] qData = new int[][] {
// / A D
// B C E F
{1000, 725, 360, 365, 275, 17, 258 }, // absCap
{1000,1000,1000,1000, 550, 109,1000 }, // absMaxCap
{1000, 741, 396, 345, 259, 110, 149 }, // used
{ 40, 20, 0, 20, 20, 20, 0 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granulrity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
// QueueE inherits non-preemption from QueueD
schedConf.setPreemptionDisabled("root.queueD", true);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// appC is running on QueueE. QueueE is over absMaxCap, but is not
// preemptable. Therefore, appC resources should not be preempted.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testOverCapacityImbalance() {
int[][] qData = new int[][]{