YARN-5831. FairScheduler: Propagate allowPreemptionFrom flag all the way down to the app. (Yufei Gu via kasha)

(cherry picked from commit e224c96234)
This commit is contained in:
Karthik Kambatla 2017-01-17 17:01:03 -08:00 committed by Karthik Kambatla
parent 0ceb7149d9
commit 71026cc3f6
9 changed files with 65 additions and 44 deletions

View File

@ -567,6 +567,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
}
boolean canContainerBePreempted(RMContainer container) {
if (!isPreemptable()) {
return false;
}
// Sanity check that the app owns this container
if (!getLiveContainersMap().containsKey(container.getContainerId()) &&
!newlyAllocatedContainers.contains(container)) {
@ -580,17 +584,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
return false;
}
// Check if any of the parent queues are not preemptable
// TODO (YARN-5831): Propagate the "preemptable" flag all the way down to
// the app to avoid recursing up every time.
for (FSQueue q = getQueue();
!q.getQueueName().equals("root");
q = q.getParent()) {
if (!q.isPreemptable()) {
return false;
}
}
// Check if the app's allocation will be over its fairshare even
// after preempting this container
Resource currentUsage = getResourceUsage();
@ -1233,4 +1226,9 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
public boolean equals(Object o) {
return super.equals(o);
}
@Override
public boolean isPreemptable() {
return getQueue().isPreemptable();
}
}

View File

@ -108,21 +108,6 @@ public class FSParentQueue extends FSQueue {
}
}
@Override
public void updatePreemptionVariables() {
super.updatePreemptionVariables();
// For child queues
readLock.lock();
try {
for (FSQueue childQueue : childQueues) {
childQueue.updatePreemptionVariables();
}
} finally {
readLock.unlock();
}
}
@Override
public Resource getDemand() {
readLock.lock();

View File

@ -80,6 +80,7 @@ public abstract class FSQueue implements Queue, Schedulable {
this.scheduler = scheduler;
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
this.parent = parent;
reinit(false);
}
/**
@ -87,10 +88,19 @@ public abstract class FSQueue implements Queue, Schedulable {
* metrics.
* This function is invoked when a new queue is created or reloading the
* allocation configuration.
*
* @param recursive whether child queues should be reinitialized recursively
*/
public void init() {
public void reinit(boolean recursive) {
AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
allocConf.initFSQueue(this, scheduler);
updatePreemptionVariables();
if (recursive) {
for (FSQueue child : getChildQueues()) {
child.reinit(recursive);
}
}
}
public String getName() {
@ -293,6 +303,7 @@ public abstract class FSQueue implements Queue, Schedulable {
this.fairSharePreemptionThreshold = fairSharePreemptionThreshold;
}
@Override
public boolean isPreemptable() {
return preemptable;
}
@ -315,7 +326,7 @@ public abstract class FSQueue implements Queue, Schedulable {
* Update the min/fair share preemption timeouts, threshold and preemption
* disabled flag for this queue.
*/
public void updatePreemptionVariables() {
private void updatePreemptionVariables() {
// For min share timeout
minSharePreemptionTimeout = scheduler.getAllocationConfiguration()
.getMinSharePreemptionTimeout(getName());
@ -334,9 +345,15 @@ public abstract class FSQueue implements Queue, Schedulable {
if (fairSharePreemptionThreshold < 0 && parent != null) {
fairSharePreemptionThreshold = parent.getFairSharePreemptionThreshold();
}
// For option whether allow preemption from this queue
preemptable = scheduler.getAllocationConfiguration()
.isPreemptable(getName());
// For option whether allow preemption from this queue.
// If the parent is non-preemptable, this queue is non-preemptable as well,
// otherwise get the value from the allocation file.
if (parent != null && !parent.isPreemptable()) {
preemptable = false;
} else {
preemptable = scheduler.getAllocationConfiguration()
.isPreemptable(getName());
}
}
/**

View File

@ -73,11 +73,12 @@ public class QueueManager {
public void initialize(Configuration conf) throws IOException,
SAXException, AllocationConfigurationException, ParserConfigurationException {
rootQueue = new FSParentQueue("root", scheduler, null);
rootQueue.init();
queues.put(rootQueue.getName(), rootQueue);
// Create the default queue
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
// Recursively reinitialize to propagate queue properties
rootQueue.reinit(true);
}
/**
@ -281,11 +282,9 @@ public class QueueManager {
queue = newParent;
}
queue.init();
parent.addChildQueue(queue);
setChildResourceLimits(parent, queue, queueConf);
queues.put(queue.getName(), queue);
queue.updatePreemptionVariables();
// If we just created a leaf node, the newParent is null, but that's OK
// because we only create a leaf node in the very last iteration.
@ -496,17 +495,11 @@ public class QueueManager {
}
}
}
rootQueue.recomputeSteadyShares();
for (FSQueue queue : queues.values()) {
queue.init();
}
// Initialize all queues recursively
rootQueue.reinit(true);
// Update steady fair shares for all queues
rootQueue.recomputeSteadyShares();
// Update the fair share preemption timeouts and preemption for all queues
// recursively
rootQueue.updatePreemptionVariables();
}
/**

View File

@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
/**
* A Schedulable represents an entity that can be scheduled such as an
@ -96,4 +95,11 @@ public interface Schedulable {
/** Assign a fair share to this Schedulable. */
void setFairShare(Resource fairShare);
/**
* Check whether the schedulable is preemptable.
* @return <code>true</code> if the schedulable is preemptable;
* <code>false</code> otherwise
*/
boolean isPreemptable();
}

View File

@ -137,4 +137,9 @@ public class FakeSchedulable implements Schedulable {
@Override
public void updateDemand() {}
@Override
public boolean isPreemptable() {
return true;
}
}

View File

@ -81,7 +81,6 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
String queueName = "root.queue1";
FSLeafQueue schedulable = new FSLeafQueue(queueName, scheduler, null);
schedulable.init();
schedulable.setMaxShare(maxResource);
assertEquals(schedulable.getMetrics().getMaxApps(), Integer.MAX_VALUE);
assertEquals(schedulable.getMetrics().getSchedulingPolicy(),

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.After;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Test;
@ -166,6 +167,14 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
// Create and add two nodes to the cluster
addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
addNode(NODE_CAPACITY_MULTIPLE * 1024, NODE_CAPACITY_MULTIPLE);
// Verify if child-1 and child-2 are preemptable
FSQueue child1 =
scheduler.getQueueManager().getQueue("nonpreemptable.child-1");
assertFalse(child1.isPreemptable());
FSQueue child2 =
scheduler.getQueueManager().getQueue("nonpreemptable.child-2");
assertFalse(child2.isPreemptable());
}
private void sendEnoughNodeUpdatesToAssignFully() {
@ -197,6 +206,10 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
scheduler.update();
sendEnoughNodeUpdatesToAssignFully();
assertEquals(8, greedyApp.getLiveContainers().size());
// Verify preemptable for queue and app attempt
assertTrue(
scheduler.getQueueManager().getQueue(queue1).isPreemptable()
== greedyApp.isPreemptable());
// Create an app that takes up all the resources on the cluster
ApplicationAttemptId appAttemptId2

View File

@ -345,6 +345,11 @@ public class TestSchedulingPolicy {
", weights:" + weights + ", demand:" + demand +
", minShare:" + minShare + "}";
}
@Override
public boolean isPreemptable() {
return true;
}
}
}