diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 32ee6179708..585545b2333 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -114,6 +114,9 @@ Release 2.5.0 - UNRELEASED YARN-2107. Refactored timeline classes into o.a.h.y.s.timeline package. (Vinod Kumar Vavilapalli via zjshen) + YARN-596. Use scheduling policies throughout the queue hierarchy to decide + which containers to preempt (Wei Yan via Sandy Ryza) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 9ed5179270a..4dc0bf4ceb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import java.io.Serializable; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,8 +33,6 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType; @@ -58,6 +58,8 @@ public class AppSchedulable extends Schedulable { private Priority priority; private ResourceWeights resourceWeights; + private RMContainerComparator comparator = new RMContainerComparator(); + public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) { this.scheduler = scheduler; this.app = app; @@ -111,7 +113,10 @@ public class AppSchedulable extends Schedulable { @Override public Resource getResourceUsage() { - return app.getCurrentConsumption(); + // Here the getPreemptedResources() always return zero, except in + // a preemption round + return Resources.subtract(app.getCurrentConsumption(), + app.getPreemptedResources()); } @@ -383,6 +388,27 @@ public class AppSchedulable extends Schedulable { return assignContainer(node, false); } + /** + * Preempt a running container according to the priority + */ + @Override + public RMContainer preemptContainer() { + if (LOG.isDebugEnabled()) { + LOG.debug("App " + getName() + " is going to preempt a running " + + "container"); + } + + RMContainer toBePreempted = null; + for (RMContainer container : app.getLiveContainers()) { + if (! app.getPreemptionContainers().contains(container) && + (toBePreempted == null || + comparator.compare(toBePreempted, container) > 0)) { + toBePreempted = container; + } + } + return toBePreempted; + } + /** * Whether this app has containers requests that could be satisfied on the * given node, if the node had full space. @@ -407,4 +433,17 @@ public class AppSchedulable extends Schedulable { Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null, anyRequest.getCapability(), node.getRMNode().getTotalCapability()); } + + static class RMContainerComparator implements Comparator, + Serializable { + @Override + public int compare(RMContainer c1, RMContainer c2) { + int ret = c1.getContainer().getPriority().compareTo( + c2.getContainer().getPriority()); + if (ret == 0) { + return c2.getContainerId().compareTo(c1.getContainerId()); + } + return ret; + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index e842a6a3557..fe738da7d46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -33,10 +33,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication; @Private @Unstable @@ -208,6 +208,36 @@ public class FSLeafQueue extends FSQueue { return assigned; } + @Override + public RMContainer preemptContainer() { + RMContainer toBePreempted = null; + if (LOG.isDebugEnabled()) { + LOG.debug("Queue " + getName() + " is going to preempt a container " + + "from its applications."); + } + + // If this queue is not over its fair share, reject + if (!preemptContainerPreCheck()) { + return toBePreempted; + } + + // Choose the app that is most over fair share + Comparator comparator = policy.getComparator(); + AppSchedulable candidateSched = null; + for (AppSchedulable sched : runnableAppScheds) { + if (candidateSched == null || + comparator.compare(sched, candidateSched) > 0) { + candidateSched = sched; + } + } + + // Preempt from the selected app + if (candidateSched != null) { + toBePreempted = candidateSched.preemptContainer(); + } + return toBePreempted; + } + @Override public List getChildQueues() { return new ArrayList(1); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 427cb864579..48db4149634 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.Comparator; import java.util.List; import org.apache.commons.logging.Log; @@ -32,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; @@ -156,6 +158,32 @@ public class FSParentQueue extends FSQueue { return assigned; } + @Override + public RMContainer preemptContainer() { + RMContainer toBePreempted = null; + + // If this queue is not over its fair share, reject + if (!preemptContainerPreCheck()) { + return toBePreempted; + } + + // Find the childQueue which is most over fair share + FSQueue candidateQueue = null; + Comparator comparator = policy.getComparator(); + for (FSQueue queue : childQueues) { + if (candidateQueue == null || + comparator.compare(queue, candidateQueue) > 0) { + candidateQueue = queue; + } + } + + // Let the selected queue choose which of its container to preempt + if (candidateQueue != null) { + toBePreempted = candidateQueue.preemptContainer(); + } + return toBePreempted; + } + @Override public List getChildQueues() { return childQueues; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index 1e94046100a..716e1ee6874 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -187,4 +187,17 @@ public abstract class FSQueue extends Schedulable implements Queue { } return true; } + + /** + * Helper method to check if the queue should preempt containers + * + * @return true if check passes (can preempt) or false otherwise + */ + protected boolean preemptContainerPreCheck() { + if (this == scheduler.getQueueManager().getRootQueue()) { + return true; + } + return parent.getPolicy() + .checkIfUsageOverFairShare(getResourceUsage(), getFairShare()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java index adabfefaee1..63a29e4b099 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java @@ -59,6 +59,8 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt { private AppSchedulable appSchedulable; final Map preemptionMap = new HashMap(); + + private Resource preemptedResources = Resources.createResource(0); public FSSchedulerApp(ApplicationAttemptId applicationAttemptId, String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager, @@ -316,6 +318,7 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt { public void addPreemption(RMContainer container, long time) { assert preemptionMap.get(container) == null; preemptionMap.put(container, time); + Resources.addTo(preemptedResources, container.getAllocatedResource()); } public Long getContainerPreemptionTime(RMContainer container) { @@ -330,4 +333,20 @@ public class FSSchedulerApp extends SchedulerApplicationAttempt { public FSLeafQueue getQueue() { return (FSLeafQueue)super.getQueue(); } + + public Resource getPreemptedResources() { + return preemptedResources; + } + + public void resetPreemptedResources() { + preemptedResources = Resources.createResource(0); + for (RMContainer container : getPreemptionContainers()) { + Resources.addTo(preemptedResources, container.getAllocatedResource()); + } + } + + public void clearPreemptedResources() { + preemptedResources.setMemory(0); + preemptedResources.setVirtualCores(0); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 830f6f75099..6d71ea2fbb3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -20,14 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -337,94 +334,78 @@ public class FairScheduler extends } if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, resToPreempt, Resources.none())) { - preemptResources(queueMgr.getLeafQueues(), resToPreempt); + preemptResources(resToPreempt); } } /** - * Preempt a quantity of resources from a list of QueueSchedulables. The - * policy for this is to pick apps from queues that are over their fair share, - * but make sure that no queue is placed below its fair share in the process. - * We further prioritize preemption by choosing containers with lowest - * priority to preempt. + * Preempt a quantity of resources. Each round, we start from the root queue, + * level-by-level, until choosing a candidate application. + * The policy for prioritizing preemption for each queue depends on its + * SchedulingPolicy: (1) fairshare/DRF, choose the ChildSchedulable that is + * most over its fair share; (2) FIFO, choose the childSchedulable that is + * latest launched. + * Inside each application, we further prioritize preemption by choosing + * containers with lowest priority to preempt. + * We make sure that no queue is placed below its fair share in the process. */ - protected void preemptResources(Collection scheds, - Resource toPreempt) { - if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) { + protected void preemptResources(Resource toPreempt) { + if (Resources.equals(toPreempt, Resources.none())) { return; } - Map apps = - new HashMap(); - Map queues = - new HashMap(); - - // Collect running containers from over-scheduled queues - List runningContainers = new ArrayList(); - for (FSLeafQueue sched : scheds) { - if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, - sched.getResourceUsage(), sched.getFairShare())) { - for (AppSchedulable as : sched.getRunnableAppSchedulables()) { - for (RMContainer c : as.getApp().getLiveContainers()) { - runningContainers.add(c); - apps.put(c, as.getApp()); - queues.put(c, sched); - } - } - } - } - - // Sort containers into reverse order of priority - Collections.sort(runningContainers, new Comparator() { - public int compare(RMContainer c1, RMContainer c2) { - int ret = c1.getContainer().getPriority().compareTo( - c2.getContainer().getPriority()); - if (ret == 0) { - return c2.getContainerId().compareTo(c1.getContainerId()); - } - return ret; - } - }); - // Scan down the list of containers we've already warned and kill them // if we need to. Remove any containers from the list that we don't need // or that are no longer running. Iterator warnedIter = warnedContainers.iterator(); - Set preemptedThisRound = new HashSet(); while (warnedIter.hasNext()) { RMContainer container = warnedIter.next(); - if (container.getState() == RMContainerState.RUNNING && + if ((container.getState() == RMContainerState.RUNNING || + container.getState() == RMContainerState.ALLOCATED) && Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, toPreempt, Resources.none())) { - warnOrKillContainer(container, apps.get(container), queues.get(container)); - preemptedThisRound.add(container); + warnOrKillContainer(container); Resources.subtractFrom(toPreempt, container.getContainer().getResource()); } else { warnedIter.remove(); } } - // Scan down the rest of the containers until we've preempted enough, making - // sure we don't preempt too many from any queue - Iterator runningIter = runningContainers.iterator(); - while (runningIter.hasNext() && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, - toPreempt, Resources.none())) { - RMContainer container = runningIter.next(); - FSLeafQueue sched = queues.get(container); - if (!preemptedThisRound.contains(container) && - Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, - sched.getResourceUsage(), sched.getFairShare())) { - warnOrKillContainer(container, apps.get(container), sched); - - warnedContainers.add(container); - Resources.subtractFrom(toPreempt, container.getContainer().getResource()); + try { + // Reset preemptedResource for each app + for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { + for (AppSchedulable app : queue.getRunnableAppSchedulables()) { + app.getApp().resetPreemptedResources(); + } + } + + while (Resources.greaterThan(RESOURCE_CALCULATOR, clusterResource, + toPreempt, Resources.none())) { + RMContainer container = + getQueueManager().getRootQueue().preemptContainer(); + if (container == null) { + break; + } else { + warnOrKillContainer(container); + warnedContainers.add(container); + Resources.subtractFrom( + toPreempt, container.getContainer().getResource()); + } + } + } finally { + // Clear preemptedResources for each app + for (FSLeafQueue queue : getQueueManager().getLeafQueues()) { + for (AppSchedulable app : queue.getRunnableAppSchedulables()) { + app.getApp().clearPreemptedResources(); + } } } } - private void warnOrKillContainer(RMContainer container, FSSchedulerApp app, - FSLeafQueue queue) { + private void warnOrKillContainer(RMContainer container) { + ApplicationAttemptId appAttemptId = container.getApplicationAttemptId(); + FSSchedulerApp app = getSchedulerApp(appAttemptId); + FSLeafQueue queue = app.getQueue(); LOG.info("Preempting container (prio=" + container.getContainer().getPriority() + "res=" + container.getContainer().getResource() + ") from queue " + queue.getName()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index 92b6d3e71ea..4f8ac1e6374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; /** @@ -100,6 +101,11 @@ public abstract class Schedulable { */ public abstract Resource assignContainer(FSSchedulerNode node); + /** + * Preempt a container from this Schedulable if possible. + */ + public abstract RMContainer preemptContainer(); + /** Assign a fair share to this Schedulable. */ public void setFairShare(Resource fairShare) { this.fairShare = fairShare; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 549b85c380f..1d77a43ce75 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -139,4 +139,14 @@ public abstract class SchedulingPolicy { */ public abstract void computeShares( Collection schedulables, Resource totalResources); + + /** + * Check if the resource usage is over the fair share under this policy + * + * @param usage {@link Resource} the resource usage + * @param fairShare {@link Resource} the fair share + * @return true if check passes (is over) or false otherwise + */ + public abstract boolean checkIfUsageOverFairShare( + Resource usage, Resource fairShare); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index f5b84177229..4b663d95de8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -69,6 +69,11 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { } } + @Override + public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { + return !Resources.fitsIn(usage, fairShare); + } + @Override public void initialize(Resource clusterCapacity) { comparator.setClusterCapacity(clusterCapacity); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index fbad1012676..ca7297ff46c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -119,6 +119,11 @@ public class FairSharePolicy extends SchedulingPolicy { ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY); } + @Override + public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { + return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare); + } + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_ANY; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 3451cfea4c5..d9969446811 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; import java.io.Serializable; import java.util.Collection; import java.util.Comparator; -import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -88,6 +87,13 @@ public class FifoPolicy extends SchedulingPolicy { earliest.setFairShare(Resources.clone(totalResources)); } + @Override + public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { + throw new UnsupportedOperationException( + "FifoPolicy doesn't support checkIfUsageOverFairshare operation, " + + "as FifoPolicy only works for FSLeafQueue."); + } + @Override public byte getApplicableDepth() { return SchedulingPolicy.DEPTH_LEAF; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index d0ba0d8e085..dcfc2d3aa2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.Records; import org.apache.hadoop.yarn.util.resource.Resources; @@ -83,6 +84,11 @@ public class FakeSchedulable extends Schedulable { return null; } + @Override + public RMContainer preemptContainer() { + return null; + } + @Override public Resource getDemand() { return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 276020f00c9..bda9564f196 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -1029,13 +1029,13 @@ public class TestFairScheduler extends FairSchedulerTestBase { @Test (timeout = 5000) /** - * Make sure containers are chosen to be preempted in the correct order. Right - * now this means decreasing order of priority. + * Make sure containers are chosen to be preempted in the correct order. */ public void testChoiceOfPreemptedContainers() throws Exception { conf.setLong(FairSchedulerConfiguration.PREEMPTION_INTERVAL, 5000); - conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); + conf.setLong(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 10000); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE + ".allocation.file", ALLOC_FILE); + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false"); MockClock clock = new MockClock(); scheduler.setClock(clock); @@ -1052,7 +1052,7 @@ public class TestFairScheduler extends FairSchedulerTestBase { out.println(""); out.println(".25"); out.println(""); - out.println(""); + out.println(""); out.println(".25"); out.println(""); out.println(""); @@ -1060,133 +1060,132 @@ public class TestFairScheduler extends FairSchedulerTestBase { scheduler.reinitialize(conf, resourceManager.getRMContext()); - // Create four nodes + // Create two nodes RMNode node1 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 1, + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 1, "127.0.0.1"); NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); scheduler.handle(nodeEvent1); RMNode node2 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 2, + MockNodes.newNodeInfo(1, Resources.createResource(4 * 1024, 4), 2, "127.0.0.2"); NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2); scheduler.handle(nodeEvent2); - RMNode node3 = - MockNodes.newNodeInfo(1, Resources.createResource(2 * 1024, 2), 3, - "127.0.0.3"); - NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3); - scheduler.handle(nodeEvent3); - - - // Queue A and B each request three containers + // Queue A and B each request two applications ApplicationAttemptId app1 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 1); + createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app1); ApplicationAttemptId app2 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 2); - ApplicationAttemptId app3 = - createSchedulingRequest(1 * 1024, "queueA", "user1", 1, 3); + createSchedulingRequest(1 * 1024, 1, "queueA", "user1", 1, 3); + createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app2); + ApplicationAttemptId app3 = + createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 1); + createSchedulingRequestExistingApplication(1 * 1024, 1, 2, app3); ApplicationAttemptId app4 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 1); - ApplicationAttemptId app5 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 2); - ApplicationAttemptId app6 = - createSchedulingRequest(1 * 1024, "queueB", "user1", 1, 3); + createSchedulingRequest(1 * 1024, 1, "queueB", "user1", 1, 3); + createSchedulingRequestExistingApplication(1 * 1024, 1, 4, app4); scheduler.update(); + scheduler.getQueueManager().getLeafQueue("queueA", true) + .setPolicy(SchedulingPolicy.parse("fifo")); + scheduler.getQueueManager().getLeafQueue("queueB", true) + .setPolicy(SchedulingPolicy.parse("fair")); + // Sufficient node check-ins to fully schedule containers - for (int i = 0; i < 2; i++) { - NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate1 = new NodeUpdateSchedulerEvent(node1); + NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); + for (int i = 0; i < 4; i++) { scheduler.handle(nodeUpdate1); - - NodeUpdateSchedulerEvent nodeUpdate2 = new NodeUpdateSchedulerEvent(node2); scheduler.handle(nodeUpdate2); - - NodeUpdateSchedulerEvent nodeUpdate3 = new NodeUpdateSchedulerEvent(node3); - scheduler.handle(nodeUpdate3); } - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app6).getLiveContainers().size()); - - // Now new requests arrive from queues C and D - ApplicationAttemptId app7 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 1); - ApplicationAttemptId app8 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 2); - ApplicationAttemptId app9 = - createSchedulingRequest(1 * 1024, "queueC", "user1", 1, 3); - - ApplicationAttemptId app10 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 1); - ApplicationAttemptId app11 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 2); - ApplicationAttemptId app12 = - createSchedulingRequest(1 * 1024, "queueD", "user1", 1, 3); + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + // Now new requests arrive from queueC and default + createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "queueC", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); + createSchedulingRequest(1 * 1024, 1, "default", "user1", 1, 1); scheduler.update(); - // We should be able to claw back one container from A and B each. - // Make sure it is lowest priority container. - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - - // First verify we are adding containers to preemption list for the application - assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app3).getLiveContainers(), - scheduler.getSchedulerApp(app3).getPreemptionContainers())); - assertTrue(!Collections.disjoint(scheduler.getSchedulerApp(app6).getLiveContainers(), - scheduler.getSchedulerApp(app6).getPreemptionContainers())); + // We should be able to claw back one container from queueA and queueB each. + scheduler.preemptResources(Resources.createResource(2 * 1024)); + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + assertEquals(2, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + + // First verify we are adding containers to preemption list for the app. + // For queueA (fifo), app2 is selected. + // For queueB (fair), app4 is selected. + assertTrue("App2 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); + assertTrue("App4 should have container to be preempted", + !Collections.disjoint( + scheduler.getSchedulerApp(app2).getLiveContainers(), + scheduler.getSchedulerApp(app2).getPreemptionContainers())); // Pretend 15 seconds have passed clock.tick(15); // Trigger a kill by insisting we want containers back - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); + scheduler.preemptResources(Resources.createResource(2 * 1024)); // At this point the containers should have been killed (since we are not simulating AM) - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app2).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); + // Inside each app, containers are sorted according to their priorities. + // Containers with priority 4 are preempted for app2 and app4. + Set set = new HashSet(); + for (RMContainer container : + scheduler.getSchedulerApp(app2).getLiveContainers()) { + if (container.getAllocatedPriority().getPriority() == 4) { + set.add(container); + } + } + for (RMContainer container : + scheduler.getSchedulerApp(app4).getLiveContainers()) { + if (container.getAllocatedPriority().getPriority() == 4) { + set.add(container); + } + } + assertTrue("Containers with priority=4 in app2 and app4 should be " + + "preempted.", set.isEmpty()); // Trigger a kill by insisting we want containers back - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); + scheduler.preemptResources(Resources.createResource(2 * 1024)); // Pretend 15 seconds have passed clock.tick(15); // We should be able to claw back another container from A and B each. - // Make sure it is lowest priority container. - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); + // For queueA (fifo), continue preempting from app2. + // For queueB (fair), even app4 has a lowest priority container with p=4, it + // still preempts from app3 as app3 is most over fair share. + scheduler.preemptResources(Resources.createResource(2 * 1024)); + + assertEquals(2, scheduler.getSchedulerApp(app1).getLiveContainers().size()); assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); + assertEquals(1, scheduler.getSchedulerApp(app3).getLiveContainers().size()); assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); // Now A and B are below fair share, so preemption shouldn't do anything - scheduler.preemptResources(scheduler.getQueueManager().getLeafQueues(), - Resources.createResource(2 * 1024)); - assertEquals(1, scheduler.getSchedulerApp(app1).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app2).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app3).getLiveContainers().size()); - assertEquals(1, scheduler.getSchedulerApp(app4).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app5).getLiveContainers().size()); - assertEquals(0, scheduler.getSchedulerApp(app6).getLiveContainers().size()); + scheduler.preemptResources(Resources.createResource(2 * 1024)); + assertTrue("App1 should have no container to be preempted", + scheduler.getSchedulerApp(app1).getPreemptionContainers().isEmpty()); + assertTrue("App2 should have no container to be preempted", + scheduler.getSchedulerApp(app2).getPreemptionContainers().isEmpty()); + assertTrue("App3 should have no container to be preempted", + scheduler.getSchedulerApp(app3).getPreemptionContainers().isEmpty()); + assertTrue("App4 should have no container to be preempted", + scheduler.getSchedulerApp(app4).getPreemptionContainers().isEmpty()); } @Test (timeout = 5000) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java index b3ab299ea88..2098e1679b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerPreemption.java @@ -35,10 +35,8 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; -import java.util.Collection; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; public class TestFairSchedulerPreemption extends FairSchedulerTestBase { @@ -51,8 +49,7 @@ public class TestFairSchedulerPreemption extends FairSchedulerTestBase { public int lastPreemptMemory = -1; @Override - protected void preemptResources( - Collection scheds, Resource toPreempt) { + protected void preemptResources(Resource toPreempt) { lastPreemptMemory = toPreempt.getMemory(); }