YARN-3885. ProportionalCapacityPreemptionPolicy doesn't preempt if queue is more than 2 level. (Ajith S via wangda)
This commit is contained in:
parent
fa2b63ed16
commit
3540d5fe4b
|
@ -640,6 +640,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-3930. FileSystemNodeLabelsStore should make sure edit log file closed when
|
YARN-3930. FileSystemNodeLabelsStore should make sure edit log file closed when
|
||||||
exception is thrown. (Dian Fu via wangda)
|
exception is thrown. (Dian Fu via wangda)
|
||||||
|
|
||||||
|
YARN-3885. ProportionalCapacityPreemptionPolicy doesn't preempt if queue is
|
||||||
|
more than 2 level. (Ajith S via wangda)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -896,8 +896,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
||||||
ret.untouchableExtra = Resource.newInstance(0, 0);
|
ret.untouchableExtra = Resource.newInstance(0, 0);
|
||||||
} else {
|
} else {
|
||||||
ret.untouchableExtra =
|
ret.untouchableExtra =
|
||||||
Resources.subtractFrom(extra, childrensPreemptable);
|
Resources.subtract(extra, childrensPreemptable);
|
||||||
}
|
}
|
||||||
|
ret.preemptableExtra = Resources.min(
|
||||||
|
rc, partitionResource, childrensPreemptable, extra);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
addTempQueuePartition(ret);
|
addTempQueuePartition(ret);
|
||||||
|
@ -1127,4 +1129,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() {
|
||||||
|
return queueToPartitions;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TempQueuePerPartition;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
|
@ -898,6 +899,37 @@ public class TestProportionalCapacityPreemptionPolicy {
|
||||||
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testHierarchicalLarge3Levels() {
|
||||||
|
int[][] qData = new int[][] {
|
||||||
|
// / A F I
|
||||||
|
// B C G H J K
|
||||||
|
// D E
|
||||||
|
{ 400, 200, 60, 140, 100, 40, 100, 70, 30, 100, 10, 90 }, // abs
|
||||||
|
{ 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap
|
||||||
|
{ 400, 210, 60, 150, 100, 50, 100, 50, 50, 90, 10, 80 }, // used
|
||||||
|
{ 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, // pending
|
||||||
|
{ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 }, // reserved
|
||||||
|
// appA appB appC appD appE appF appG
|
||||||
|
{ 7, 3, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
|
||||||
|
{ -1, -1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
|
||||||
|
{ 3, 2, 0, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
|
||||||
|
};
|
||||||
|
ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
|
||||||
|
policy.editSchedule();
|
||||||
|
// XXX note: compensating for rounding error in Resources.multiplyTo
|
||||||
|
// which is likely triggered since we use small numbers for readability
|
||||||
|
//run with Logger.getRootLogger().setLevel(Level.DEBUG);
|
||||||
|
verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appC)));
|
||||||
|
assertEquals(10, policy.getQueuePartitions().get("queueE").get("").preemptableExtra.getMemory());
|
||||||
|
//2nd level child(E) preempts 10, but parent A has only 9 extra
|
||||||
|
//check the parent can prempt only the extra from > 2 level child
|
||||||
|
TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get("queueA").get("");
|
||||||
|
assertEquals(0, tempQueueAPartition.untouchableExtra.getMemory());
|
||||||
|
int extraForQueueA = tempQueueAPartition.current.getMemory()- tempQueueAPartition.guaranteed.getMemory();
|
||||||
|
assertEquals(extraForQueueA,tempQueueAPartition.preemptableExtra.getMemory());
|
||||||
|
}
|
||||||
|
|
||||||
static class IsPreemptionRequestFor
|
static class IsPreemptionRequestFor
|
||||||
extends ArgumentMatcher<ContainerPreemptEvent> {
|
extends ArgumentMatcher<ContainerPreemptEvent> {
|
||||||
private final ApplicationAttemptId appAttId;
|
private final ApplicationAttemptId appAttId;
|
||||||
|
|
Loading…
Reference in New Issue