YARN-2056. Disable preemption at Queue level. Contributed by Eric Payne

This commit is contained in:
Jason Lowe 2014-12-05 21:06:48 +00:00
parent 3c72f54ef5
commit 4b13082199
3 changed files with 425 additions and 32 deletions

View File

@ -126,6 +126,8 @@ Release 2.7.0 - UNRELEASED
YARN-2301. Improved yarn container command. (Naganarasimha G R via jianhe) YARN-2301. Improved yarn container command. (Naganarasimha G R via jianhe)
YARN-2056. Disable preemption at Queue level (Eric Payne via jlowe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -27,6 +27,7 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
import java.util.PriorityQueue;
import java.util.Set; import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -111,6 +112,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
public static final String NATURAL_TERMINATION_FACTOR = public static final String NATURAL_TERMINATION_FACTOR =
"yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor"; "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
public static final String BASE_YARN_RM_PREEMPTION = "yarn.scheduler.capacity.";
public static final String SUFFIX_DISABLE_PREEMPTION = ".disable_preemption";
// the dispatcher to send preempt and kill events // the dispatcher to send preempt and kill events
public EventHandler<ContainerPreemptEvent> dispatcher; public EventHandler<ContainerPreemptEvent> dispatcher;
@ -192,7 +196,7 @@ private void containerBasedPreemptOrKill(CSQueue root,
// extract a summary of the queues from scheduler // extract a summary of the queues from scheduler
TempQueue tRoot; TempQueue tRoot;
synchronized (scheduler) { synchronized (scheduler) {
tRoot = cloneQueues(root, clusterResources); tRoot = cloneQueues(root, clusterResources, false);
} }
// compute the ideal distribution of resources among queues // compute the ideal distribution of resources among queues
@ -370,28 +374,60 @@ private void computeIdealResourceDistribution(ResourceCalculator rc,
private void computeFixpointAllocation(ResourceCalculator rc, private void computeFixpointAllocation(ResourceCalculator rc,
Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned, Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned,
boolean ignoreGuarantee) { boolean ignoreGuarantee) {
//assign all cluster resources until no more demand, or no resources are left // Prior to assigning the unused resources, process each queue as follows:
while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant, // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
unassigned, Resources.none())) { // Else idealAssigned = current;
Resource wQassigned = Resource.newInstance(0, 0); // Subtract idealAssigned resources from unassigned.
// If the queue has all of its needs met (that is, if
// idealAssigned >= current + pending), remove it from consideration.
// Sort queues from most under-guaranteed to most over-guaranteed.
TQComparator tqComparator = new TQComparator(rc, tot_guarant);
PriorityQueue<TempQueue> orderedByNeed =
new PriorityQueue<TempQueue>(10,tqComparator);
for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
TempQueue q = i.next();
if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
} else {
q.idealAssigned = Resources.clone(q.current);
}
Resources.subtractFrom(unassigned, q.idealAssigned);
// If idealAssigned < (current + pending), q needs more resources, so
// add it to the list of underserved queues, ordered by need.
Resource curPlusPend = Resources.add(q.current, q.pending);
if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
orderedByNeed.add(q);
}
}
//assign all cluster resources until no more demand, or no resources are left
while (!orderedByNeed.isEmpty()
&& Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) {
Resource wQassigned = Resource.newInstance(0, 0);
// we compute normalizedGuarantees capacity based on currently active // we compute normalizedGuarantees capacity based on currently active
// queues // queues
resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee); resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee);
// offer for each queue their capacity first and in following invocations // For each underserved queue (or set of queues if multiple are equally
// their share of over-capacity // underserved), offer its share of the unassigned resources based on its
for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) { // normalized guarantee. After the offer, if the queue is not satisfied,
// place it back in the ordered list of queues, recalculating its place
// in the order of most under-guaranteed to most over-guaranteed. In this
// way, the most underserved queue(s) are always given resources first.
Collection<TempQueue> underserved =
getMostUnderservedQueues(orderedByNeed, tqComparator);
for (Iterator<TempQueue> i = underserved.iterator(); i.hasNext();) {
TempQueue sub = i.next(); TempQueue sub = i.next();
Resource wQavail = Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
Resources.multiply(unassigned, sub.normalizedGuarantee); unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
Resource wQidle = sub.offer(wQavail, rc, tot_guarant); Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
Resource wQdone = Resources.subtract(wQavail, wQidle); Resource wQdone = Resources.subtract(wQavail, wQidle);
// if the queue returned a value > 0 it means it is fully satisfied
// and it is removed from the list of active queues qAlloc if (Resources.greaterThan(rc, tot_guarant,
if (!Resources.greaterThan(rc, tot_guarant,
wQdone, Resources.none())) { wQdone, Resources.none())) {
i.remove(); // The queue is still asking for more. Put it back in the priority
// queue, recalculating its order based on need.
orderedByNeed.add(sub);
} }
Resources.addTo(wQassigned, wQdone); Resources.addTo(wQassigned, wQdone);
} }
@ -399,6 +435,27 @@ private void computeFixpointAllocation(ResourceCalculator rc,
} }
} }
// Take the most underserved TempQueue (the one on the head). Collect and
// return the list of all queues that have the same idealAssigned
// percentage of guaranteed.
protected Collection<TempQueue> getMostUnderservedQueues(
PriorityQueue<TempQueue> orderedByNeed, TQComparator tqComparator) {
ArrayList<TempQueue> underserved = new ArrayList<TempQueue>();
while (!orderedByNeed.isEmpty()) {
TempQueue q1 = orderedByNeed.remove();
underserved.add(q1);
TempQueue q2 = orderedByNeed.peek();
// q1's pct of guaranteed won't be larger than q2's. If it's less, then
// return what has already been collected. Otherwise, q1's pct of
// guaranteed == that of q2, so add q2 to underserved list during the
// next pass.
if (q2 == null || tqComparator.compare(q1,q2) < 0) {
return underserved;
}
}
return underserved;
}
/** /**
* Computes a normalizedGuaranteed capacity based on active queues * Computes a normalizedGuaranteed capacity based on active queues
* @param rc resource calculator * @param rc resource calculator
@ -626,9 +683,11 @@ public String getPolicyName() {
* *
* @param root the root of the CapacityScheduler queue hierarchy * @param root the root of the CapacityScheduler queue hierarchy
* @param clusterResources the total amount of resources in the cluster * @param clusterResources the total amount of resources in the cluster
* @param parentDisablePreempt true if disable preemption is set for parent
* @return the root of the cloned queue hierarchy * @return the root of the cloned queue hierarchy
*/ */
private TempQueue cloneQueues(CSQueue root, Resource clusterResources) { private TempQueue cloneQueues(CSQueue root, Resource clusterResources,
boolean parentDisablePreempt) {
TempQueue ret; TempQueue ret;
synchronized (root) { synchronized (root) {
String queueName = root.getQueueName(); String queueName = root.getQueueName();
@ -639,19 +698,46 @@ private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
Resource current = Resources.multiply(clusterResources, absUsed); Resource current = Resources.multiply(clusterResources, absUsed);
Resource guaranteed = Resources.multiply(clusterResources, absCap); Resource guaranteed = Resources.multiply(clusterResources, absCap);
Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap); Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
boolean queueDisablePreemption = false;
String queuePropName = BASE_YARN_RM_PREEMPTION + root.getQueuePath()
+ SUFFIX_DISABLE_PREEMPTION;
queueDisablePreemption = scheduler.getConfiguration()
.getBoolean(queuePropName, parentDisablePreempt);
Resource extra = Resource.newInstance(0, 0);
if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) {
extra = Resources.subtract(current, guaranteed);
}
if (root instanceof LeafQueue) { if (root instanceof LeafQueue) {
LeafQueue l = (LeafQueue) root; LeafQueue l = (LeafQueue) root;
Resource pending = l.getTotalResourcePending(); Resource pending = l.getTotalResourcePending();
ret = new TempQueue(queueName, current, pending, guaranteed, ret = new TempQueue(queueName, current, pending, guaranteed,
maxCapacity); maxCapacity);
if (queueDisablePreemption) {
ret.untouchableExtra = extra;
} else {
ret.preemptableExtra = extra;
}
ret.setLeafQueue(l); ret.setLeafQueue(l);
} else { } else {
Resource pending = Resource.newInstance(0, 0); Resource pending = Resource.newInstance(0, 0);
ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
maxCapacity); maxCapacity);
Resource childrensPreemptable = Resource.newInstance(0, 0);
for (CSQueue c : root.getChildQueues()) { for (CSQueue c : root.getChildQueues()) {
ret.addChild(cloneQueues(c, clusterResources)); TempQueue subq =
cloneQueues(c, clusterResources, queueDisablePreemption);
Resources.addTo(childrensPreemptable, subq.preemptableExtra);
ret.addChild(subq);
}
// untouchableExtra = max(extra - childrenPreemptable, 0)
if (Resources.greaterThanOrEqual(
rc, clusterResources, childrensPreemptable, extra)) {
ret.untouchableExtra = Resource.newInstance(0, 0);
} else {
ret.untouchableExtra =
Resources.subtractFrom(extra, childrensPreemptable);
} }
} }
} }
@ -690,6 +776,8 @@ static class TempQueue {
Resource idealAssigned; Resource idealAssigned;
Resource toBePreempted; Resource toBePreempted;
Resource actuallyPreempted; Resource actuallyPreempted;
Resource untouchableExtra;
Resource preemptableExtra;
double normalizedGuarantee; double normalizedGuarantee;
@ -708,6 +796,8 @@ static class TempQueue {
this.toBePreempted = Resource.newInstance(0, 0); this.toBePreempted = Resource.newInstance(0, 0);
this.normalizedGuarantee = Float.NaN; this.normalizedGuarantee = Float.NaN;
this.children = new ArrayList<TempQueue>(); this.children = new ArrayList<TempQueue>();
this.untouchableExtra = Resource.newInstance(0, 0);
this.preemptableExtra = Resource.newInstance(0, 0);
} }
public void setLeafQueue(LeafQueue l){ public void setLeafQueue(LeafQueue l){
@ -761,10 +851,20 @@ public String toString() {
.append(" IDEAL_ASSIGNED: ").append(idealAssigned) .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
.append(" IDEAL_PREEMPT: ").append(toBePreempted) .append(" IDEAL_PREEMPT: ").append(toBePreempted)
.append(" ACTUAL_PREEMPT: ").append(actuallyPreempted) .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted)
.append(" UNTOUCHABLE: ").append(untouchableExtra)
.append(" PREEMPTABLE: ").append(preemptableExtra)
.append("\n"); .append("\n");
return sb.toString(); return sb.toString();
} }
public void printAll() {
LOG.info(this.toString());
for (TempQueue sub : this.getChildren()) {
sub.printAll();
}
}
public void assignPreemption(float scalingFactor, public void assignPreemption(float scalingFactor,
ResourceCalculator rc, Resource clusterResource) { ResourceCalculator rc, Resource clusterResource) {
if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) { if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) {
@ -793,4 +893,38 @@ void appendLogString(StringBuilder sb) {
} }
static class TQComparator implements Comparator<TempQueue> {
private ResourceCalculator rc;
private Resource clusterRes;
TQComparator(ResourceCalculator rc, Resource clusterRes) {
this.rc = rc;
this.clusterRes = clusterRes;
}
@Override
public int compare(TempQueue tq1, TempQueue tq2) {
if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
return -1;
}
if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
return 1;
}
return 0;
}
// Calculates idealAssigned / guaranteed
// TempQueues with 0 guarantees are always considered the most over
// capacity and therefore considered last for resources.
private double getIdealPctOfGuaranteed(TempQueue q) {
double pctOver = Integer.MAX_VALUE;
if (q != null && Resources.greaterThan(
rc, clusterRes, q.guaranteed, Resources.none())) {
pctOver =
Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed);
}
return (pctOver);
}
}
} }

View File

@ -17,16 +17,19 @@
*/ */
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity; package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.BASE_YARN_RM_PREEMPTION;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.SUFFIX_DISABLE_PREEMPTION;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL; import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Matchers.argThat; import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
@ -62,6 +65,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@ -86,6 +90,7 @@ public class TestProportionalCapacityPreemptionPolicy {
Clock mClock = null; Clock mClock = null;
Configuration conf = null; Configuration conf = null;
CapacityScheduler mCS = null; CapacityScheduler mCS = null;
CapacitySchedulerConfiguration schedConf = null;
EventHandler<ContainerPreemptEvent> mDisp = null; EventHandler<ContainerPreemptEvent> mDisp = null;
ResourceCalculator rc = new DefaultResourceCalculator(); ResourceCalculator rc = new DefaultResourceCalculator();
final ApplicationAttemptId appA = ApplicationAttemptId.newInstance( final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
@ -98,6 +103,8 @@ public class TestProportionalCapacityPreemptionPolicy {
ApplicationId.newInstance(TS, 3), 0); ApplicationId.newInstance(TS, 3), 0);
final ApplicationAttemptId appE = ApplicationAttemptId.newInstance( final ApplicationAttemptId appE = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 4), 0); ApplicationId.newInstance(TS, 4), 0);
final ApplicationAttemptId appF = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(TS, 4), 0);
final ArgumentCaptor<ContainerPreemptEvent> evtCaptor = final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
ArgumentCaptor.forClass(ContainerPreemptEvent.class); ArgumentCaptor.forClass(ContainerPreemptEvent.class);
@ -123,6 +130,8 @@ public void setup() {
mClock = mock(Clock.class); mClock = mock(Clock.class);
mCS = mock(CapacityScheduler.class); mCS = mock(CapacityScheduler.class);
when(mCS.getResourceCalculator()).thenReturn(rc); when(mCS.getResourceCalculator()).thenReturn(rc);
schedConf = new CapacitySchedulerConfiguration();
when(mCS.getConfiguration()).thenReturn(schedConf);
mDisp = mock(EventHandler.class); mDisp = mock(EventHandler.class);
rand = new Random(); rand = new Random();
long seed = rand.nextLong(); long seed = rand.nextLong();
@ -265,6 +274,240 @@ public void testDeadzone() {
verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class)); verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
} }
@Test
public void testPerQueueDisablePreemption() {
int[][] qData = new int[][]{
// / A B C
{ 100, 55, 25, 20 }, // abs
{ 100, 100, 100, 100 }, // maxCap
{ 100, 0, 54, 46 }, // used
{ 10, 10, 0, 0 }, // pending
{ 0, 0, 0, 0 }, // reserved
// appA appB appC
{ 3, 1, 1, 1 }, // apps
{ -1, 1, 1, 1 }, // req granularity
{ 3, 0, 0, 0 }, // subqueues
};
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ "root.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// With PREEMPTION_DISABLED set for queueB, get resources from queueC
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB)));
// With no PREEMPTION_DISABLED set for queueB, resources will be preempted
// from both queueB and queueC. Test must be reset for so that the mDisp
// event handler will count only events from the following test and not the
// previous one.
setup();
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ "root.queueB" + SUFFIX_DISABLE_PREEMPTION, false);
policy2.editSchedule();
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appB)));
verify(mDisp, times(6)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testPerQueueDisablePreemptionHierarchical() {
int[][] qData = new int[][] {
// / A D
// B C E F
{ 200, 100, 50, 50, 100, 10, 90 }, // abs
{ 200, 200, 200, 200, 200, 200, 200 }, // maxCap
{ 200, 110, 60, 50, 90, 90, 0 }, // used
{ 10, 0, 0, 0, 10, 0, 10 }, // pending
{ 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD
{ 4, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 2, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// verify capacity taken from queueB (appA), not queueE (appC) despite
// queueE being far over its absolute capacity because queueA (queueB's
// parent) is over capacity and queueD (queueE's parent) is not.
ApplicationAttemptId expectedAttemptOnQueueB =
ApplicationAttemptId.newInstance(
appA.getApplicationId(), appA.getAttemptId());
assertTrue("appA should be running on queueB",
mCS.getAppsInQueue("queueB").contains(expectedAttemptOnQueueB));
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
// Need to call setup() again to reset mDisp
setup();
// Disable preemption for queueB and it's children
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
policy2.editSchedule();
ApplicationAttemptId expectedAttemptOnQueueC =
ApplicationAttemptId.newInstance(
appB.getApplicationId(), appB.getAttemptId());
ApplicationAttemptId expectedAttemptOnQueueE =
ApplicationAttemptId.newInstance(
appC.getApplicationId(), appC.getAttemptId());
// Now, all of queueB's (appA) over capacity is not preemptable, so neither
// is queueA's. Verify that capacity is taken from queueE (appC).
assertTrue("appB should be running on queueC",
mCS.getAppsInQueue("queueC").contains(expectedAttemptOnQueueC));
assertTrue("appC should be running on queueE",
mCS.getAppsInQueue("queueE").contains(expectedAttemptOnQueueE));
// Resources should have come from queueE (appC) and neither of queueA's
// children.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testPerQueueDisablePreemptionBroadHierarchical() {
int[][] qData = new int[][] {
// / A D G
// B C E F H I
{1000, 350, 150, 200, 400, 200, 200, 250, 100, 150 }, // abs
{1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
{1000, 400, 200, 200, 400, 250, 150, 200, 150, 50 }, // used
{ 50, 0, 0, 0, 50, 0, 50, 0, 0, 0 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE appF
{ 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// queueF(appD) wants resources, Verify that resources come from queueE(appC)
// because it's a sibling and queueB(appA) because queueA is over capacity.
verify(mDisp, times(28)).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(22)).handle(argThat(new IsPreemptionRequestFor(appC)));
// Need to call setup() again to reset mDisp
setup();
// Disable preemption for queueB(appA)
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
policy2.editSchedule();
// Now that queueB(appA) is not preemptable, verify that resources come
// from queueE(appC)
verify(mDisp, times(50)).handle(argThat(new IsPreemptionRequestFor(appC)));
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA)));
setup();
// Disable preemption for two of the 3 queues with over-capacity.
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ "root.queueD.queueE" + SUFFIX_DISABLE_PREEMPTION, true);
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ "root.queueA.queueB" + SUFFIX_DISABLE_PREEMPTION, true);
ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
policy3.editSchedule();
// Verify that the request was starved out even though queueH(appE) is
// over capacity. This is because queueG (queueH's parent) is NOT
// overcapacity.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueB
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
}
@Test
public void testPerQueueDisablePreemptionInheritParent() {
int[][] qData = new int[][] {
// / A E
// B C D F G H
{1000, 500, 200, 200, 100, 500, 200, 200, 100 }, // abs (guar)
{1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
{1000, 700, 0, 350, 350, 300, 0, 200, 100 }, // used
{ 200, 0, 0, 0, 0, 200, 200, 0, 0 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE
{ 5, 2, 0, 1, 1, 3, 1, 1, 1 }, // apps
{ -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granulrity
{ 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// With all queues preemptable, resources should be taken from queueC(appA)
// and queueD(appB). Resources taken more from queueD(appB) than
// queueC(appA) because it's over its capacity by a larger percentage.
verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(182)).handle(argThat(new IsPreemptionRequestFor(appB)));
// Disable preemption for queueA and it's children. queueF(appC)'s request
// should starve.
setup(); // Call setup() to reset mDisp
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ "root.queueA" + SUFFIX_DISABLE_PREEMPTION, true);
ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
policy2.editSchedule();
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueD
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueG
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
}
@Test
public void testPerQueuePreemptionNotAllUntouchable() {
int[][] qData = new int[][] {
// / A E
// B C D F G H
{ 2000, 1000, 800, 100, 100, 1000, 500, 300, 200 }, // abs
{ 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000, 2000 }, // maxCap
{ 2000, 1300, 300, 800, 200, 700, 500, 0, 200 }, // used
{ 300, 0, 0, 0, 0, 300, 0, 300, 0 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE appF
{ 6, 3, 1, 1, 1, 3, 1, 1, 1 }, // apps
{ -1, -1, 1, 1, 1, -1, 1, 1, 1 }, // req granularity
{ 2, 3, 0, 0, 0, 3, 0, 0, 0 }, // subqueues
};
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ "root.queueA.queueC" + SUFFIX_DISABLE_PREEMPTION, true);
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule();
// Although queueC(appB) is way over capacity and is untouchable,
// queueD(appC) is preemptable. Request should be filled from queueD(appC).
verify(mDisp, times(100)).handle(argThat(new IsPreemptionRequestFor(appC)));
}
@Test
public void testPerQueueDisablePreemptionRootDisablesAll() {
int[][] qData = new int[][] {
// / A D G
// B C E F H I
{1000, 500, 250, 250, 250, 100, 150, 250, 100, 150 }, // abs
{1000,1000,1000,1000,1000,1000,1000,1000,1000,1000 }, // maxCap
{1000, 20, 0, 20, 490, 240, 250, 490, 240, 250 }, // used
{ 200, 200, 200, 0, 0, 0, 0, 0, 0, 0 }, // pending
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
// appA appB appC appD appE appF
{ 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granulrity
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
};
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
schedConf.setBoolean(BASE_YARN_RM_PREEMPTION
+ "root" + SUFFIX_DISABLE_PREEMPTION, true);
policy.editSchedule();
// All queues should be non-preemptable, so request should starve.
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appB))); // queueC
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appC))); // queueE
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appD))); // queueB
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appE))); // queueH
verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appF))); // queueI
}
@Test @Test
public void testOverCapacityImbalance() { public void testOverCapacityImbalance() {
int[][] qData = new int[][]{ int[][] qData = new int[][]{
@ -341,7 +584,7 @@ public void testHierarchical() {
policy.editSchedule(); policy.editSchedule();
// verify capacity taken from A1, not B1 despite B1 being far over // verify capacity taken from A1, not B1 despite B1 being far over
// its absolute guaranteed capacity // its absolute guaranteed capacity
verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA))); verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
} }
@Test @Test
@ -390,15 +633,17 @@ public void testZeroGuarOverCap() {
@Test @Test
public void testHierarchicalLarge() { public void testHierarchicalLarge() {
int[][] qData = new int[][] { int[][] qData = new int[][] {
// / A B C D E F G H I // / A D G
{ 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs // B C E F H I
{ 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, }, // maxCap { 400, 200, 60, 140, 100, 70, 30, 100, 10, 90 }, // abs
{ 400, 210, 70,140, 100, 50, 50, 90, 90, 0 }, // used { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending { 400, 210, 70, 140, 100, 50, 50, 90, 90, 0 }, // used
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved { 15, 0, 0, 0, 0, 0, 0, 0, 0, 15 }, // pending
{ 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
{ -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity // appA appB appC appD appE appF
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues { 6, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
{ -1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
{ 3, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
}; };
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData); ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
policy.editSchedule(); policy.editSchedule();
@ -407,8 +652,8 @@ public void testHierarchicalLarge() {
// XXX note: compensating for rounding error in Resources.multiplyTo // XXX note: compensating for rounding error in Resources.multiplyTo
// which is likely triggered since we use small numbers for readability // which is likely triggered since we use small numbers for readability
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA))); verify(mDisp, times(7)).handle(argThat(new IsPreemptionRequestFor(appA)));
verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE))); verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appE)));
} }
@Test @Test
@ -629,6 +874,7 @@ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot); when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot); when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot); when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
when(root.getQueuePath()).thenReturn("root");
for (int i = 1; i < queues.length; ++i) { for (int i = 1; i < queues.length; ++i) {
final CSQueue q; final CSQueue q;
@ -644,6 +890,10 @@ ParentQueue mockNested(int[] abs, int[] maxCap, int[] used,
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot); when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot); when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot); when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
String parentPathName = p.getQueuePath();
parentPathName = (parentPathName == null) ? "root" : parentPathName;
String queuePathName = (parentPathName+"."+queueName).replace("/","root");
when(q.getQueuePath()).thenReturn(queuePathName);
} }
assert 0 == pqs.size(); assert 0 == pqs.size();
return root; return root;
@ -666,6 +916,8 @@ ParentQueue mockParentQueue(ParentQueue p, int subqueues,
LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs, LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) { int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
LeafQueue lq = mock(LeafQueue.class); LeafQueue lq = mock(LeafQueue.class);
List<ApplicationAttemptId> appAttemptIdList =
new ArrayList<ApplicationAttemptId>();
when(lq.getTotalResourcePending()).thenReturn( when(lq.getTotalResourcePending()).thenReturn(
Resource.newInstance(pending[i], 0)); Resource.newInstance(pending[i], 0));
// consider moving where CapacityScheduler::comparator accessible // consider moving where CapacityScheduler::comparator accessible
@ -683,9 +935,14 @@ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
int aPending = pending[i] / apps[i]; int aPending = pending[i] / apps[i];
int aReserve = reserved[i] / apps[i]; int aReserve = reserved[i] / apps[i];
for (int a = 0; a < apps[i]; ++a) { for (int a = 0; a < apps[i]; ++a) {
qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i])); FiCaSchedulerApp mockFiCaApp =
mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]);
qApps.add(mockFiCaApp);
++appAlloc; ++appAlloc;
appAttemptIdList.add(mockFiCaApp.getApplicationAttemptId());
} }
when(mCS.getAppsInQueue("queue" + (char)('A' + i - 1)))
.thenReturn(appAttemptIdList);
} }
when(lq.getApplications()).thenReturn(qApps); when(lq.getApplications()).thenReturn(qApps);
if(setAMResourcePercent != 0.0f){ if(setAMResourcePercent != 0.0f){