YARN-4462. FairScheduler: Disallow preemption from a queue. (Tao Jie via kasha)
(cherry picked from commit fb238d7e5d
)
This commit is contained in:
parent
b94fbdf6f0
commit
c25154576b
|
@ -9,6 +9,8 @@ Release 2.9.0 - UNRELEASED
|
||||||
YARN-1856. Added cgroups based memory monitoring for containers as another
|
YARN-1856. Added cgroups based memory monitoring for containers as another
|
||||||
alternative to custom memory-monitoring. (Varun Vasudev via vinodkv)
|
alternative to custom memory-monitoring. (Varun Vasudev via vinodkv)
|
||||||
|
|
||||||
|
YARN-4462. FairScheduler: Disallow preemption from a queue. (Tao Jie via kasha)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
|
|
||||||
YARN-4072. ApplicationHistoryServer, WebAppProxyServer, NodeManager and
|
YARN-4072. ApplicationHistoryServer, WebAppProxyServer, NodeManager and
|
||||||
|
|
|
@ -98,6 +98,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
// Reservation system configuration
|
// Reservation system configuration
|
||||||
private ReservationQueueConfiguration globalReservationQueueConfig;
|
private ReservationQueueConfiguration globalReservationQueueConfig;
|
||||||
|
|
||||||
|
private final Set<String> nonPreemptableQueues;
|
||||||
|
|
||||||
public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
public AllocationConfiguration(Map<String, Resource> minQueueResources,
|
||||||
Map<String, Resource> maxQueueResources,
|
Map<String, Resource> maxQueueResources,
|
||||||
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
||||||
|
@ -114,7 +116,8 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
QueuePlacementPolicy placementPolicy,
|
QueuePlacementPolicy placementPolicy,
|
||||||
Map<FSQueueType, Set<String>> configuredQueues,
|
Map<FSQueueType, Set<String>> configuredQueues,
|
||||||
ReservationQueueConfiguration globalReservationQueueConfig,
|
ReservationQueueConfiguration globalReservationQueueConfig,
|
||||||
Set<String> reservableQueues) {
|
Set<String> reservableQueues,
|
||||||
|
Set<String> nonPreemptableQueues) {
|
||||||
this.minQueueResources = minQueueResources;
|
this.minQueueResources = minQueueResources;
|
||||||
this.maxQueueResources = maxQueueResources;
|
this.maxQueueResources = maxQueueResources;
|
||||||
this.queueMaxApps = queueMaxApps;
|
this.queueMaxApps = queueMaxApps;
|
||||||
|
@ -135,6 +138,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
this.globalReservationQueueConfig = globalReservationQueueConfig;
|
this.globalReservationQueueConfig = globalReservationQueueConfig;
|
||||||
this.placementPolicy = placementPolicy;
|
this.placementPolicy = placementPolicy;
|
||||||
this.configuredQueues = configuredQueues;
|
this.configuredQueues = configuredQueues;
|
||||||
|
this.nonPreemptableQueues = nonPreemptableQueues;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AllocationConfiguration(Configuration conf) {
|
public AllocationConfiguration(Configuration conf) {
|
||||||
|
@ -161,6 +165,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
}
|
}
|
||||||
placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
|
placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
|
||||||
configuredQueues);
|
configuredQueues);
|
||||||
|
nonPreemptableQueues = new HashSet<String>();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -210,6 +215,10 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration {
|
||||||
-1f : fairSharePreemptionThreshold;
|
-1f : fairSharePreemptionThreshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPreemptable(String queueName) {
|
||||||
|
return !nonPreemptableQueues.contains(queueName);
|
||||||
|
}
|
||||||
|
|
||||||
public ResourceWeights getQueueWeight(String queue) {
|
public ResourceWeights getQueueWeight(String queue) {
|
||||||
ResourceWeights weight = queueWeights.get(queue);
|
ResourceWeights weight = queueWeights.get(queue);
|
||||||
return (weight == null) ? ResourceWeights.NEUTRAL : weight;
|
return (weight == null) ? ResourceWeights.NEUTRAL : weight;
|
||||||
|
|
|
@ -224,6 +224,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
|
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
|
||||||
new HashMap<String, Map<QueueACL, AccessControlList>>();
|
new HashMap<String, Map<QueueACL, AccessControlList>>();
|
||||||
Set<String> reservableQueues = new HashSet<String>();
|
Set<String> reservableQueues = new HashSet<String>();
|
||||||
|
Set<String> nonPreemptableQueues = new HashSet<String>();
|
||||||
int userMaxAppsDefault = Integer.MAX_VALUE;
|
int userMaxAppsDefault = Integer.MAX_VALUE;
|
||||||
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
||||||
Resource queueMaxResourcesDefault = Resources.unbounded();
|
Resource queueMaxResourcesDefault = Resources.unbounded();
|
||||||
|
@ -360,7 +361,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
|
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
|
||||||
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
|
queuePolicies, minSharePreemptionTimeouts, fairSharePreemptionTimeouts,
|
||||||
fairSharePreemptionThresholds, queueAcls, configuredQueues,
|
fairSharePreemptionThresholds, queueAcls, configuredQueues,
|
||||||
reservableQueues);
|
reservableQueues, nonPreemptableQueues);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load placement policy and pass it configured queues
|
// Load placement policy and pass it configured queues
|
||||||
|
@ -409,7 +410,7 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
defaultSchedPolicy, minSharePreemptionTimeouts,
|
defaultSchedPolicy, minSharePreemptionTimeouts,
|
||||||
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
|
fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls,
|
||||||
newPlacementPolicy, configuredQueues, globalReservationQueueConfig,
|
newPlacementPolicy, configuredQueues, globalReservationQueueConfig,
|
||||||
reservableQueues);
|
reservableQueues, nonPreemptableQueues);
|
||||||
|
|
||||||
lastSuccessfulReload = clock.getTime();
|
lastSuccessfulReload = clock.getTime();
|
||||||
lastReloadAttemptFailed = false;
|
lastReloadAttemptFailed = false;
|
||||||
|
@ -431,7 +432,8 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
Map<String, Float> fairSharePreemptionThresholds,
|
Map<String, Float> fairSharePreemptionThresholds,
|
||||||
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
||||||
Map<FSQueueType, Set<String>> configuredQueues,
|
Map<FSQueueType, Set<String>> configuredQueues,
|
||||||
Set<String> reservableQueues)
|
Set<String> reservableQueues,
|
||||||
|
Set<String> nonPreemptableQueues)
|
||||||
throws AllocationConfigurationException {
|
throws AllocationConfigurationException {
|
||||||
String queueName = element.getAttribute("name").trim();
|
String queueName = element.getAttribute("name").trim();
|
||||||
|
|
||||||
|
@ -508,13 +510,19 @@ public class AllocationFileLoaderService extends AbstractService {
|
||||||
isLeaf = false;
|
isLeaf = false;
|
||||||
reservableQueues.add(queueName);
|
reservableQueues.add(queueName);
|
||||||
configuredQueues.get(FSQueueType.PARENT).add(queueName);
|
configuredQueues.get(FSQueueType.PARENT).add(queueName);
|
||||||
|
} else if ("allowPreemptionFrom".equals(field.getTagName())) {
|
||||||
|
String text = ((Text)field.getFirstChild()).getData().trim();
|
||||||
|
if (!Boolean.parseBoolean(text)) {
|
||||||
|
nonPreemptableQueues.add(queueName);
|
||||||
|
}
|
||||||
} else if ("queue".endsWith(field.getTagName()) ||
|
} else if ("queue".endsWith(field.getTagName()) ||
|
||||||
"pool".equals(field.getTagName())) {
|
"pool".equals(field.getTagName())) {
|
||||||
loadQueue(queueName, field, minQueueResources, maxQueueResources,
|
loadQueue(queueName, field, minQueueResources, maxQueueResources,
|
||||||
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
|
queueMaxApps, userMaxApps, queueMaxAMShares, queueWeights,
|
||||||
queuePolicies, minSharePreemptionTimeouts,
|
queuePolicies, minSharePreemptionTimeouts,
|
||||||
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
|
fairSharePreemptionTimeouts, fairSharePreemptionThresholds,
|
||||||
queueAcls, configuredQueues, reservableQueues);
|
queueAcls, configuredQueues, reservableQueues,
|
||||||
|
nonPreemptableQueues);
|
||||||
isLeaf = false;
|
isLeaf = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -260,6 +260,14 @@ public class FSParentQueue extends FSQueue {
|
||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
for (FSQueue queue : childQueues) {
|
for (FSQueue queue : childQueues) {
|
||||||
|
// Skip selection for non-preemptable queue
|
||||||
|
if (!queue.isPreemptable()) {
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("skipping from queue=" + getName()
|
||||||
|
+ " because it's a non-preemptable queue");
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
if (candidateQueue == null ||
|
if (candidateQueue == null ||
|
||||||
comparator.compare(queue, candidateQueue) > 0) {
|
comparator.compare(queue, candidateQueue) > 0) {
|
||||||
candidateQueue = queue;
|
candidateQueue = queue;
|
||||||
|
|
|
@ -62,6 +62,7 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
private long fairSharePreemptionTimeout = Long.MAX_VALUE;
|
private long fairSharePreemptionTimeout = Long.MAX_VALUE;
|
||||||
private long minSharePreemptionTimeout = Long.MAX_VALUE;
|
private long minSharePreemptionTimeout = Long.MAX_VALUE;
|
||||||
private float fairSharePreemptionThreshold = 0.5f;
|
private float fairSharePreemptionThreshold = 0.5f;
|
||||||
|
private boolean preemptable = true;
|
||||||
|
|
||||||
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
|
public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
@ -235,6 +236,10 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
|
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPreemptable() {
|
||||||
|
return preemptable;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Recomputes the shares for all child queues and applications based on this
|
* Recomputes the shares for all child queues and applications based on this
|
||||||
* queue's current share
|
* queue's current share
|
||||||
|
@ -242,7 +247,8 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
public abstract void recomputeShares();
|
public abstract void recomputeShares();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update the min/fair share preemption timeouts and threshold for this queue.
|
* Update the min/fair share preemption timeouts, threshold and preemption
|
||||||
|
* disabled flag for this queue.
|
||||||
*/
|
*/
|
||||||
public void updatePreemptionVariables() {
|
public void updatePreemptionVariables() {
|
||||||
// For min share timeout
|
// For min share timeout
|
||||||
|
@ -263,6 +269,9 @@ public abstract class FSQueue implements Queue, Schedulable {
|
||||||
if (fairSharePreemptionThreshold < 0 && parent != null) {
|
if (fairSharePreemptionThreshold < 0 && parent != null) {
|
||||||
fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
|
fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
|
||||||
}
|
}
|
||||||
|
// For option whether allow preemption from this queue
|
||||||
|
preemptable = scheduler.getAllocationConfiguration()
|
||||||
|
.isPreemptable(getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -81,6 +81,7 @@ public class FairSchedulerPage extends RmView {
|
||||||
}
|
}
|
||||||
ri._(STEADY_FAIR_SHARE + ":", qinfo.getSteadyFairShare().toString());
|
ri._(STEADY_FAIR_SHARE + ":", qinfo.getSteadyFairShare().toString());
|
||||||
ri._(INSTANTANEOUS_FAIR_SHARE + ":", qinfo.getFairShare().toString());
|
ri._(INSTANTANEOUS_FAIR_SHARE + ":", qinfo.getFairShare().toString());
|
||||||
|
ri._("Preemptable:", qinfo.isPreemptable());
|
||||||
html._(InfoBlock.class);
|
html._(InfoBlock.class);
|
||||||
|
|
||||||
// clear the info contents so this queue's info doesn't accumulate into another queue's info
|
// clear the info contents so this queue's info doesn't accumulate into another queue's info
|
||||||
|
|
|
@ -65,6 +65,8 @@ public class FairSchedulerQueueInfo {
|
||||||
private String queueName;
|
private String queueName;
|
||||||
private String schedulingPolicy;
|
private String schedulingPolicy;
|
||||||
|
|
||||||
|
private boolean preemptable;
|
||||||
|
|
||||||
private FairSchedulerQueueInfoList childQueues;
|
private FairSchedulerQueueInfoList childQueues;
|
||||||
|
|
||||||
public FairSchedulerQueueInfo() {
|
public FairSchedulerQueueInfo() {
|
||||||
|
@ -108,6 +110,7 @@ public class FairSchedulerQueueInfo {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
preemptable = queue.isPreemptable();
|
||||||
childQueues = getChildQueues(queue, scheduler);
|
childQueues = getChildQueues(queue, scheduler);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -228,4 +231,8 @@ public class FairSchedulerQueueInfo {
|
||||||
return childQueues != null ? childQueues.getQueueInfoList() :
|
return childQueues != null ? childQueues.getQueueInfoList() :
|
||||||
new ArrayList<FairSchedulerQueueInfo>();
|
new ArrayList<FairSchedulerQueueInfo>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isPreemptable() {
|
||||||
|
return preemptable;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2492,6 +2492,333 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
||||||
1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
|
1536, scheduler.resourceDeficit(queueC, clock.getTime()).getMemory());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* Tests the decision to preempt tasks respect to non-preemptable queues
|
||||||
|
* 1, Queues as follow:
|
||||||
|
* queueA(non-preemptable)
|
||||||
|
* queueB(preemptable)
|
||||||
|
* parentQueue(non-preemptable)
|
||||||
|
* --queueC(preemptable)
|
||||||
|
* queueD(preemptable)
|
||||||
|
* 2, Submit request to queueA, queueB, queueC, and all of them are over MinShare
|
||||||
|
* 3, Now all resource are occupied
|
||||||
|
* 4, Submit request to queueD, and need to preempt resource from other queues
|
||||||
|
* 5, Only preemptable queue(queueB) would be preempted.
|
||||||
|
*/
|
||||||
|
public void testPreemptionDecisionWithNonPreemptableQueue() throws Exception {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
ControlledClock clock = new ControlledClock();
|
||||||
|
scheduler.setClock(clock);
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"default\">");
|
||||||
|
out.println("<maxResources>0mb,0vcores</maxResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueA\">");
|
||||||
|
out.println("<weight>.25</weight>");
|
||||||
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||||
|
out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueB\">");
|
||||||
|
out.println("<weight>.25</weight>");
|
||||||
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"parentQueue\">");
|
||||||
|
out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
||||||
|
out.println("<queue name=\"queueC\">");
|
||||||
|
out.println("<weight>.25</weight>");
|
||||||
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueD\">");
|
||||||
|
out.println("<weight>.25</weight>");
|
||||||
|
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
||||||
|
out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
|
||||||
|
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
// Create four nodes(3G each)
|
||||||
|
RMNode node1 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
|
||||||
|
"127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
|
RMNode node2 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
|
||||||
|
"127.0.0.2");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeEvent2);
|
||||||
|
|
||||||
|
RMNode node3 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
|
||||||
|
"127.0.0.3");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
||||||
|
scheduler.handle(nodeEvent3);
|
||||||
|
|
||||||
|
RMNode node4 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
|
||||||
|
"127.0.0.4");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
|
||||||
|
scheduler.handle(nodeEvent4);
|
||||||
|
|
||||||
|
// Submit apps to queueA, queueB, queueC,
|
||||||
|
// now all resource of the cluster is occupied
|
||||||
|
ApplicationAttemptId app1 =
|
||||||
|
createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
|
||||||
|
ApplicationAttemptId app2 =
|
||||||
|
createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 2);
|
||||||
|
ApplicationAttemptId app3 =
|
||||||
|
createSchedulingRequest(1 * 1024, "parentQueue.queueC", "user1", 4, 3);
|
||||||
|
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
// Sufficient node check-ins to fully schedule containers
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeUpdate1);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeUpdate2);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
||||||
|
scheduler.handle(nodeUpdate3);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
||||||
|
scheduler.handle(nodeUpdate4);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
||||||
|
|
||||||
|
// Now new requests arrive from queues D
|
||||||
|
ApplicationAttemptId app4 =
|
||||||
|
createSchedulingRequest(1 * 1024, "queueD", "user1", 4, 1);
|
||||||
|
scheduler.update();
|
||||||
|
FSLeafQueue schedD =
|
||||||
|
scheduler.getQueueManager().getLeafQueue("queueD", true);
|
||||||
|
|
||||||
|
// After minSharePreemptionTime has passed, 2G resource should preempted from
|
||||||
|
// queueB to queueD
|
||||||
|
clock.tickSec(6);
|
||||||
|
assertEquals(2048,
|
||||||
|
scheduler.resourceDeficit(schedD, clock.getTime()).getMemory());
|
||||||
|
|
||||||
|
scheduler.preemptResources(Resources.createResource(2 * 1024));
|
||||||
|
// now only app2 is selected to be preempted
|
||||||
|
assertTrue("App2 should have container to be preempted",
|
||||||
|
!Collections.disjoint(
|
||||||
|
scheduler.getSchedulerApp(app2).getLiveContainers(),
|
||||||
|
scheduler.getSchedulerApp(app2).getPreemptionContainers()));
|
||||||
|
assertTrue("App1 should not have container to be preempted",
|
||||||
|
Collections.disjoint(
|
||||||
|
scheduler.getSchedulerApp(app1).getLiveContainers(),
|
||||||
|
scheduler.getSchedulerApp(app1).getPreemptionContainers()));
|
||||||
|
assertTrue("App3 should not have container to be preempted",
|
||||||
|
Collections.disjoint(
|
||||||
|
scheduler.getSchedulerApp(app3).getLiveContainers(),
|
||||||
|
scheduler.getSchedulerApp(app3).getPreemptionContainers()));
|
||||||
|
// Pretend 20 seconds have passed
|
||||||
|
clock.tickSec(20);
|
||||||
|
scheduler.preemptResources(Resources.createResource(2 * 1024));
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeUpdate1);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeUpdate2);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
||||||
|
scheduler.handle(nodeUpdate3);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
||||||
|
scheduler.handle(nodeUpdate4);
|
||||||
|
}
|
||||||
|
// after preemption
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
||||||
|
assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
||||||
|
assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* Tests the decision to preempt tasks when allowPreemptionFrom is set false on
|
||||||
|
* all queues.
|
||||||
|
* Then none of them would be preempted actually.
|
||||||
|
* 1, Queues as follow:
|
||||||
|
* queueA(non-preemptable)
|
||||||
|
* queueB(non-preemptable)
|
||||||
|
* parentQueue(non-preemptable)
|
||||||
|
* --queueC(preemptable)
|
||||||
|
* parentQueue(preemptable)
|
||||||
|
* --queueD(non-preemptable)
|
||||||
|
* 2, Submit request to queueB, queueC, queueD, and all of them are over MinShare
|
||||||
|
* 3, Now all resource are occupied
|
||||||
|
* 4, Submit request to queueA, and need to preempt resource from other queues
|
||||||
|
* 5, None of queues would be preempted.
|
||||||
|
*/
|
||||||
|
public void testPreemptionDecisionWhenPreemptionDisabledOnAllQueues()
|
||||||
|
throws Exception {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
ControlledClock clock = new ControlledClock();
|
||||||
|
scheduler.setClock(clock);
|
||||||
|
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"default\">");
|
||||||
|
out.println("<maxResources>0mb,0vcores</maxResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueA\">");
|
||||||
|
out.println("<weight>.25</weight>");
|
||||||
|
out.println("<minResources>2048mb,0vcores</minResources>");
|
||||||
|
out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queueB\">");
|
||||||
|
out.println("<weight>.25</weight>");
|
||||||
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||||
|
out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"parentQueue1\">");
|
||||||
|
out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
||||||
|
out.println("<queue name=\"queueC\">");
|
||||||
|
out.println("<weight>.25</weight>");
|
||||||
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"parentQueue2\">");
|
||||||
|
out.println("<queue name=\"queueD\">");
|
||||||
|
out.println("<weight>.25</weight>");
|
||||||
|
out.println("<minResources>1024mb,0vcores</minResources>");
|
||||||
|
out.println("<allowPreemptionFrom>false</allowPreemptionFrom>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<defaultMinSharePreemptionTimeout>5</defaultMinSharePreemptionTimeout>");
|
||||||
|
out.println("<defaultFairSharePreemptionTimeout>10</defaultFairSharePreemptionTimeout>");
|
||||||
|
out.println("<defaultFairSharePreemptionThreshold>.5</defaultFairSharePreemptionThreshold>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
// Create four nodes(3G each)
|
||||||
|
RMNode node1 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 1,
|
||||||
|
"127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
|
RMNode node2 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 2,
|
||||||
|
"127.0.0.2");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeEvent2);
|
||||||
|
|
||||||
|
RMNode node3 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 3,
|
||||||
|
"127.0.0.3");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
|
||||||
|
scheduler.handle(nodeEvent3);
|
||||||
|
|
||||||
|
RMNode node4 =
|
||||||
|
MockNodes.newNodeInfo(1, Resources.createResource(3 * 1024, 3), 4,
|
||||||
|
"127.0.0.4");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
|
||||||
|
scheduler.handle(nodeEvent4);
|
||||||
|
|
||||||
|
// Submit apps to queueB, queueC, queueD
|
||||||
|
// now all resource of the cluster is occupied
|
||||||
|
|
||||||
|
ApplicationAttemptId app1 =
|
||||||
|
createSchedulingRequest(1 * 1024, "queueB", "user1", 4, 1);
|
||||||
|
ApplicationAttemptId app2 =
|
||||||
|
createSchedulingRequest(1 * 1024, "parentQueue1.queueC", "user1", 4, 2);
|
||||||
|
ApplicationAttemptId app3 =
|
||||||
|
createSchedulingRequest(1 * 1024, "parentQueue2.queueD", "user1", 4, 3);
|
||||||
|
scheduler.update();
|
||||||
|
|
||||||
|
// Sufficient node check-ins to fully schedule containers
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeUpdate1);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeUpdate2);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
||||||
|
scheduler.handle(nodeUpdate3);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
||||||
|
scheduler.handle(nodeUpdate4);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
||||||
|
|
||||||
|
// Now new requests arrive from queues A
|
||||||
|
ApplicationAttemptId app4 =
|
||||||
|
createSchedulingRequest(1 * 1024, "queueA", "user1", 4, 1);
|
||||||
|
scheduler.update();
|
||||||
|
FSLeafQueue schedA =
|
||||||
|
scheduler.getQueueManager().getLeafQueue("queueA", true);
|
||||||
|
|
||||||
|
// After minSharePreemptionTime has passed, resource deficit is 2G
|
||||||
|
clock.tickSec(6);
|
||||||
|
assertEquals(2048,
|
||||||
|
scheduler.resourceDeficit(schedA, clock.getTime()).getMemory());
|
||||||
|
|
||||||
|
scheduler.preemptResources(Resources.createResource(2 * 1024));
|
||||||
|
// now none app is selected to be preempted
|
||||||
|
assertTrue("App1 should have container to be preempted",
|
||||||
|
Collections.disjoint(
|
||||||
|
scheduler.getSchedulerApp(app1).getLiveContainers(),
|
||||||
|
scheduler.getSchedulerApp(app1).getPreemptionContainers()));
|
||||||
|
assertTrue("App2 should not have container to be preempted",
|
||||||
|
Collections.disjoint(
|
||||||
|
scheduler.getSchedulerApp(app2).getLiveContainers(),
|
||||||
|
scheduler.getSchedulerApp(app2).getPreemptionContainers()));
|
||||||
|
assertTrue("App3 should not have container to be preempted",
|
||||||
|
Collections.disjoint(
|
||||||
|
scheduler.getSchedulerApp(app3).getLiveContainers(),
|
||||||
|
scheduler.getSchedulerApp(app3).getPreemptionContainers()));
|
||||||
|
// Pretend 20 seconds have passed
|
||||||
|
clock.tickSec(20);
|
||||||
|
scheduler.preemptResources(Resources.createResource(2 * 1024));
|
||||||
|
for (int i = 0; i < 3; i++) {
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeUpdate1);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
|
||||||
|
scheduler.handle(nodeUpdate2);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
|
||||||
|
scheduler.handle(nodeUpdate3);
|
||||||
|
|
||||||
|
NodeUpdateSchedulerEvent nodeUpdate4 = new NodeUpdateSchedulerEvent(node4);
|
||||||
|
scheduler.handle(nodeUpdate4);
|
||||||
|
}
|
||||||
|
// after preemption
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app1).getLiveContainers().size());
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app2).getLiveContainers().size());
|
||||||
|
assertEquals(4, scheduler.getSchedulerApp(app3).getLiveContainers().size());
|
||||||
|
assertEquals(0, scheduler.getSchedulerApp(app4).getLiveContainers().size());
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
|
public void testBackwardsCompatiblePreemptionConfiguration() throws Exception {
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
|
Loading…
Reference in New Issue