YARN-596. Use scheduling policies throughout the queue hierarchy to decide which containers to preempt (Wei Yan via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1598198 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2014-05-29 04:03:04 +00:00
parent e82172c455
commit ed77c8925d
15 changed files with 309 additions and 162 deletions

View File

@ -99,6 +99,9 @@ Release 2.5.0 - UNRELEASED
YARN-2107. Refactored timeline classes into o.a.h.y.s.timeline package. (Vinod
Kumar Vavilapalli via zjshen)
YARN-596. Use scheduling policies throughout the queue hierarchy to decide
which containers to preempt (Wei Yan via Sandy Ryza)
OPTIMIZATIONS
BUG FIXES

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -31,8 +33,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
@ -58,6 +58,8 @@ public class AppSchedulable extends Schedulable {
private Priority priority;
private ResourceWeights resourceWeights;
private RMContainerComparator comparator = new RMContainerComparator();
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler;
this.app = app;
@ -111,7 +113,10 @@ public class AppSchedulable extends Schedulable {
@Override
public Resource getResourceUsage() {
return app.getCurrentConsumption();
// Here the getPreemptedResources() always return zero, except in
// a preemption round
return Resources.subtract(app.getCurrentConsumption(),
app.getPreemptedResources());
}
@ -383,6 +388,27 @@ public class AppSchedulable extends Schedulable {
return assignContainer(node, false);
}
/**
* Preempt a running container according to the priority
*/
@Override
public RMContainer preemptContainer() {
if (LOG.isDebugEnabled()) {
LOG.debug("App " + getName() + " is going to preempt a running " +
"container");
}
RMContainer toBePreempted = null;
for (RMContainer container : app.getLiveContainers()) {
if (! app.getPreemptionContainers().contains(container) &&
(toBePreempted == null ||
comparator.compare(toBePreempted, container) > 0)) {
toBePreempted = container;
}
}
return toBePreempted;
}
/**
* Whether this app has containers requests that could be satisfied on the
* given node, if the node had full space.
@ -407,4 +433,17 @@ public class AppSchedulable extends Schedulable {
Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
anyRequest.getCapability(), node.getRMNode().getTotalCapability());
}
static class RMContainerComparator implements Comparator<RMContainer>,
Serializable {
@Override
public int compare(RMContainer c1, RMContainer c2) {
int ret = c1.getContainer().getPriority().compareTo(
c2.getContainer().getPriority());
if (ret == 0) {
return c2.getContainerId().compareTo(c1.getContainerId());
}
return ret;
}
}
}

View File

@ -33,10 +33,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
@Private
@Unstable
@ -208,6 +208,36 @@ public class FSLeafQueue extends FSQueue {
return assigned;
}
@Override
public RMContainer preemptContainer() {
RMContainer toBePreempted = null;
if (LOG.isDebugEnabled()) {
LOG.debug("Queue " + getName() + " is going to preempt a container " +
"from its applications.");
}
// If this queue is not over its fair share, reject
if (!preemptContainerPreCheck()) {
return toBePreempted;
}
// Choose the app that is most over fair share
Comparator<Schedulable> comparator = policy.getComparator();
AppSchedulable candidateSched = null;
for (AppSchedulable sched : runnableAppScheds) {
if (candidateSched == null ||
comparator.compare(sched, candidateSched) > 0) {
candidateSched = sched;
}
}
// Preempt from the selected app
if (candidateSched != null) {
toBePreempted = candidateSched.preemptContainer();
}
return toBePreempted;
}
@Override
public List<FSQueue> getChildQueues() {
return new ArrayList<FSQueue>(1);

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import org.apache.commons.logging.Log;
@ -32,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
@ -156,6 +158,32 @@ public class FSParentQueue extends FSQueue {
return assigned;
}
@Override
public RMContainer preemptContainer() {
RMContainer toBePreempted = null;
// If this queue is not over its fair share, reject
if (!preemptContainerPreCheck()) {
return toBePreempted;
}
// Find the childQueue which is most over fair share
FSQueue candidateQueue = null;
Comparator<Schedulable> comparator = policy.getComparator();
for (FSQueue queue : childQueues) {
if (candidateQueue == null ||
comparator.compare(queue, candidateQueue) > 0) {
candidateQueue = queue;
}
}
// Let the selected queue choose which of its container to preempt
if (candidateQueue != null) {
toBePreempted = candidateQueue.preemptContainer();
}
return toBePreempted;
}
@Override
public List<FSQueue> getChildQueues() {
return childQueues;

View File

@ -187,4 +187,17 @@ public abstract class FSQueue extends Schedulable implements Queue {
}
return true;
}
/**
* Helper method to check if the queue should preempt containers
*
* @return true if check passes (can preempt) or false otherwise
*/
protected boolean preemptContainerPreCheck() {
if (this == scheduler.getQueueManager().getRootQueue()) {
return true;
}
return parent.getPolicy()
.checkIfUsageOverFairShare(getResourceUsage(), getFairShare());
}
}

View File

@ -59,6 +59,8 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt {
private AppSchedulable appSchedulable;
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
private Resource preemptedResources = Resources.createResource(0);
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
@ -316,6 +318,7 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt {
public void addPreemption(RMContainer container, long time) {
assert preemptionMap.get(container) == null;
preemptionMap.put(container, time);
Resources.addTo(preemptedResources, container.getAllocatedResource());
}
public Long getContainerPreemptionTime(RMContainer container) {
@ -330,4 +333,20 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt {
public FSLeafQueue getQueue() {
return (FSLeafQueue)super.getQueue();
}
public Resource getPreemptedResources() {
return preemptedResources;
}
public void resetPreemptedResources() {
preemptedResources = Resources.createResource(0);
for (RMContainer container : getPreemptionContainers()) {
Resources.addTo(preemptedResources, container.getAllocatedResource());
}
}
public void clearPreemptedResources() {
preemptedResources.setMemory(0);
preemptedResources.setVirtualCores(0);
}
}

View File

@ -20,14 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -337,94 +334,78 @@ public class FairScheduler extends
}
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt,
Resources.none())) {
preemptResources(queueMgr.getLeafQueues(), resToPreempt);
preemptResources(resToPreempt);
}
}
/**
* Preempt a quantity of resources from a list of QueueSchedulables. The
* policy for this is to pick apps from queues that are over their fair share,
* but make sure that no queue is placed below its fair share in the process.
* We further prioritize preemption by choosing containers with lowest
* priority to preempt.
* Preempt a quantity of resources. Each round, we start from the root queue,
* level-by-level, until choosing a candidate application.
* The policy for prioritizing preemption for each queue depends on its
* SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is
* most over its fair share; (2) FIFO, choose the childSchedulable that is
* latest launched.
* Inside each application, we further prioritize preemption by choosing
* containers with lowest priority to preempt.
* We make sure that no queue is placed below its fair share in the process.
*/
protected void preemptResources(Collection<FSLeafQueue> scheds,
Resource toPreempt) {
if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
protected void preemptResources(Resource toPreempt) {
if (Resources.equals(toPreempt, Resources.none())) {
return;
}
Map<RMContainer, FSSchedulerApp> apps =
new HashMap<RMContainer, FSSchedulerApp>();
Map<RMContainer, FSLeafQueue> queues =
new HashMap<RMContainer, FSLeafQueue>();
// Collect running containers from over-scheduled queues
List<RMContainer> runningContainers = new ArrayList<RMContainer>();
for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), sched.getFairShare())) {
for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c);
apps.put(c, as.getApp());
queues.put(c, sched);
}
}
}
}
// Sort containers into reverse order of priority
Collections.sort(runningContainers, new Comparator<RMContainer>() {
public int compare(RMContainer c1, RMContainer c2) {
int ret = c1.getContainer().getPriority().compareTo(
c2.getContainer().getPriority());
if (ret == 0) {
return c2.getContainerId().compareTo(c1.getContainerId());
}
return ret;
}
});
// Scan down the list of containers we've already warned and kill them
// if we need to. Remove any containers from the list that we don't need
// or that are no longer running.
Iterator<RMContainer> warnedIter = warnedContainers.iterator();
Set<RMContainer> preemptedThisRound = new HashSet<RMContainer>();
while (warnedIter.hasNext()) {
RMContainer container = warnedIter.next();
if (container.getState() == RMContainerState.RUNNING &&
if ((container.getState() == RMContainerState.RUNNING ||
container.getState() == RMContainerState.ALLOCATED) &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
warnOrKillContainer(container, apps.get(container), queues.get(container));
preemptedThisRound.add(container);
warnOrKillContainer(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
} else {
warnedIter.remove();
}
}
// Scan down the rest of the containers until we've preempted enough, making
// sure we don't preempt too many from any queue
Iterator<RMContainer> runningIter = runningContainers.iterator();
while (runningIter.hasNext() &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
RMContainer container = runningIter.next();
FSLeafQueue sched = queues.get(container);
if (!preemptedThisRound.contains(container) &&
Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
sched.getResourceUsage(), sched.getFairShare())) {
warnOrKillContainer(container, apps.get(container), sched);
warnedContainers.add(container);
Resources.subtractFrom(toPreempt, container.getContainer().getResource());
try {
// Reset preemptedResource for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
for (AppSchedulable app : queue.getRunnableAppSchedulables()) {
app.getApp().resetPreemptedResources();
}
}
while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource,
toPreempt, Resources.none())) {
RMContainer container =
getQueueManager().getRootQueue().preemptContainer();
if (container == null) {
break;
} else {
warnOrKillContainer(container);
warnedContainers.add(container);
Resources.subtractFrom(
toPreempt, container.getContainer().getResource());
}
}
} finally {
// Clear preemptedResources for each app
for (FSLeafQueue queue : getQueueManager().getLeafQueues()) {
for (AppSchedulable app : queue.getRunnableAppSchedulables()) {
app.getApp().clearPreemptedResources();
}
}
}
}
private void warnOrKillContainer(RMContainer container, FSSchedulerApp app,
FSLeafQueue queue) {
private void warnOrKillContainer(RMContainer container) {
ApplicationAttemptId appAttemptId = container.getApplicationAttemptId();
FSSchedulerApp app = getSchedulerApp(appAttemptId);
FSLeafQueue queue = app.getQueue();
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
") from queue " + queue.getName());

View File

@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@ -100,6 +101,11 @@ public abstract class Schedulable {
*/
public abstract Resource assignContainer(FSSchedulerNode node);
/**
* Preempt a container from this Schedulable if possible.
*/
public abstract RMContainer preemptContainer();
/** Assign a fair share to this Schedulable. */
public void setFairShare(Resource fairShare) {
this.fairShare = fairShare;

View File

@ -139,4 +139,14 @@ public abstract class SchedulingPolicy {
*/
public abstract void computeShares(
Collection<? extends Schedulable> schedulables, Resource totalResources);
/**
* Check if the resource usage is over the fair share under this policy
*
* @param usage {@link Resource} the resource usage
* @param fairShare {@link Resource} the fair share
* @return true if check passes (is over) or false otherwise
*/
public abstract boolean checkIfUsageOverFairShare(
Resource usage, Resource fairShare);
}

View File

@ -69,6 +69,11 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy {
}
}
@Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return !Resources.fitsIn(usage, fairShare);
}
@Override
public void initialize(Resource clusterCapacity) {
comparator.setClusterCapacity(clusterCapacity);

View File

@ -119,6 +119,11 @@ public class FairSharePolicy extends SchedulingPolicy {
ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY);
}
@Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare);
}
@Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_ANY;

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies;
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -88,6 +87,13 @@ public class FifoPolicy extends SchedulingPolicy {
earliest.setFairShare(Resources.clone(totalResources));
}
@Override
public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) {
throw new UnsupportedOperationException(
"FifoPolicy doesn't support checkIfUsageOverFairshare operation, " +
"as FifoPolicy only works for FSLeafQueue.");
}
@Override
public byte getApplicableDepth() {
return SchedulingPolicy.DEPTH_LEAF;

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -83,6 +84,11 @@ public class FakeSchedulable extends Schedulable {
return null;
}
@Override
public RMContainer preemptContainer() {
return null;
}
@Override
public Resource getDemand() {
return null;

View File

@ -1029,13 +1029,13 @@ public class TestFairScheduler extends FairSchedulerTestBase {
@Test (timeout = 5000)
/**
* Make sure containers are chosen to be preempted in the correct order. Right
* now this means decreasing order of priority.
* Make sure containers are chosen to be preempted in the correct order.
*/
public void testChoiceOfPreemptedContainers() throws Exception {
conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000);
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000);
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE);
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
MockClock clock = new MockClock();
scheduler.setClock(clock);
@ -1052,7 +1052,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
out.println("<queue name=\"queueC\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("<queue name=\"queueD\">");
out.println("<queue name=\"default\">");
out.println("<weight>.25</weight>");
out.println("</queue>");
out.println("</allocations>");
@ -1060,133 +1060,132 @@ public class TestFairScheduler extends FairSchedulerTestBase {
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Create four nodes
// Create two nodes
RMNode node1 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1,
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1,
"127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
RMNode node2 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2,
MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2,
"127.0.0.2");
NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
scheduler.handle(nodeEvent2);
RMNode node3 =
MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3,
"127.0.0.3");
NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
scheduler.handle(nodeEvent3);
// Queue A and B each request three containers
// Queue A and B each request two applications
ApplicationAttemptId app1 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1);
createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1);
createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1);
ApplicationAttemptId app2 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2);
ApplicationAttemptId app3 =
createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3);
createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3);
createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2);
ApplicationAttemptId app3 =
createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1);
createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3);
ApplicationAttemptId app4 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1);
ApplicationAttemptId app5 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2);
ApplicationAttemptId app6 =
createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3);
createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3);
createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4);
scheduler.update();
scheduler.getQueueManager().getLeafQueue("queueA", true)
.setPolicy(SchedulingPolicy.parse("fifo"));
scheduler.getQueueManager().getLeafQueue("queueB", true)
.setPolicy(SchedulingPolicy.parse("fair"));
// Sufficient node check-ins to fully schedule containers
for (int i = 0; i < 2; i++) {
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1);
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
for (int i = 0; i < 4; i++) {
scheduler.handle(nodeUpdate1);
NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2);
scheduler.handle(nodeUpdate2);
NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3);
scheduler.handle(nodeUpdate3);
}
assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size());
// Now new requests arrive from queues C and D
ApplicationAttemptId app7 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1);
ApplicationAttemptId app8 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2);
ApplicationAttemptId app9 =
createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3);
ApplicationAttemptId app10 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1);
ApplicationAttemptId app11 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2);
ApplicationAttemptId app12 =
createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3);
assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size());
assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size());
// Now new requests arrive from queueC and default
createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1);
createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1);
scheduler.update();
// We should be able to claw back one container from A and B each.
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size());
// First verify we are adding containers to preemption list for the application
assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(),
scheduler.getSchedulerApp(app3).getPreemptionContainers()));
assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(),
scheduler.getSchedulerApp(app6).getPreemptionContainers()));
// We should be able to claw back one container from queueA and queueB each.
scheduler.preemptResources(Resources.createResource(2 * 1024));
assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size());
// First verify we are adding containers to preemption list for the app.
// For queueA (fifo), app2 is selected.
// For queueB (fair), app4 is selected.
assertTrue("App2 should have container to be preempted",
!Collections.disjoint(
scheduler.getSchedulerApp(app2).getLiveContainers(),
scheduler.getSchedulerApp(app2).getPreemptionContainers()));
assertTrue("App4 should have container to be preempted",
!Collections.disjoint(
scheduler.getSchedulerApp(app2).getLiveContainers(),
scheduler.getSchedulerApp(app2).getPreemptionContainers()));
// Pretend 15 seconds have passed
clock.tick(15);
// Trigger a kill by insisting we want containers back
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
scheduler.preemptResources(Resources.createResource(2 * 1024));
// At this point the containers should have been killed (since we are not simulating AM)
assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
// Inside each app, containers are sorted according to their priorities.
// Containers with priority 4 are preempted for app2 and app4.
Set<RMContainer> set = new HashSet<RMContainer>();
for (RMContainer container :
scheduler.getSchedulerApp(app2).getLiveContainers()) {
if (container.getAllocatedPriority().getPriority() == 4) {
set.add(container);
}
}
for (RMContainer container :
scheduler.getSchedulerApp(app4).getLiveContainers()) {
if (container.getAllocatedPriority().getPriority() == 4) {
set.add(container);
}
}
assertTrue("Containers with priority=4 in app2 and app4 should be " +
"preempted.", set.isEmpty());
// Trigger a kill by insisting we want containers back
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
scheduler.preemptResources(Resources.createResource(2 * 1024));
// Pretend 15 seconds have passed
clock.tick(15);
// We should be able to claw back another container from A and B each.
// Make sure it is lowest priority container.
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
// For queueA (fifo), continue preempting from app2.
// For queueB (fair), even app4 has a lowest priority container with p=4, it
// still preempts from app3 as app3 is most over fair share.
scheduler.preemptResources(Resources.createResource(2 * 1024));
assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
// Now A and B are below fair share, so preemption shouldn't do anything
scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(),
Resources.createResource(2 * 1024));
assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size());
assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size());
assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size());
scheduler.preemptResources(Resources.createResource(2 * 1024));
assertTrue("App1 should have no container to be preempted",
scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty());
assertTrue("App2 should have no container to be preempted",
scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty());
assertTrue("App3 should have no container to be preempted",
scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty());
assertTrue("App4 should have no container to be preempted",
scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty());
}
@Test (timeout = 5000)

View File

@ -35,10 +35,8 @@ import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
@ -51,8 +49,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase {
public int lastPreemptMemory = -1;
@Override
protected void preemptResources(
Collection<FSLeafQueue> scheds, Resource toPreempt) {
protected void preemptResources(Resource toPreempt) {
lastPreemptMemory = toPreempt.getMemory();
}