From 21843592db7bef75d4c0320e5ed7f932c190974a Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Fri, 22 Aug 2014 16:00:57 +0000 Subject: [PATCH] YARN-2393. FairScheduler: Add the notion of steady fair share. (Wei Yan via kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1619852 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/fair/FSAppAttempt.java | 6 - .../scheduler/fair/FSParentQueue.java | 11 +- .../scheduler/fair/FSQueue.java | 19 ++- .../scheduler/fair/FSQueueMetrics.java | 17 ++- .../scheduler/fair/FairScheduler.java | 4 + .../scheduler/fair/QueueManager.java | 10 +- .../scheduler/fair/Schedulable.java | 7 - .../scheduler/fair/SchedulingPolicy.java | 27 +++- .../fair/policies/ComputeFairShares.java | 34 ++++- .../DominantResourceFairnessPolicy.java | 9 ++ .../fair/policies/FairSharePolicy.java | 8 + .../scheduler/fair/policies/FifoPolicy.java | 8 + .../scheduler/fair/FakeSchedulable.java | 5 - .../scheduler/fair/TestFairScheduler.java | 139 +++++++++++++++++- .../fair/TestFairSchedulerFairShare.java | 68 ++++++++- 16 files changed, 328 insertions(+), 47 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 026d54b2d8e..fa756d31d3a 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -35,6 +35,9 @@ Release 2.6.0 - UNRELEASED YARN-2174. Enable HTTPs for the writer REST API of TimelineServer. (Zhijie Shen via jianhe) + YARN-2393. FairScheduler: Add the notion of steady fair share. + (Wei Yan via kasha) + IMPROVEMENTS YARN-2242. Improve exception information on AM launch crashes. (Li Lu diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index eb6f6413893..bf543768f8c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -717,12 +717,6 @@ public void setFairShare(Resource fairShare) { this.fairShare = fairShare; } - @Override - public boolean isActive() { - return true; - } - - @Override public void updateDemand() { demand = Resources.createResource(0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java index 9af72a511e0..26a706c7f03 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -68,6 +67,16 @@ public void recomputeShares() { } } + public void recomputeSteadyShares() { + policy.computeSteadyShares(childQueues, getSteadyFairShare()); + for (FSQueue childQueue : childQueues) { + childQueue.getMetrics().setSteadyFairShare(childQueue.getSteadyFairShare()); + if (childQueue instanceof FSParentQueue) { + ((FSParentQueue) childQueue).recomputeSteadyShares(); + } + } + } + @Override public Resource getDemand() { return demand; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index c071c73321d..00f0795e1da 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -41,6 +41,7 @@ @Unstable public abstract class FSQueue implements Queue, Schedulable { private Resource fairShare = Resources.createResource(0, 0); + private Resource steadyFairShare = Resources.createResource(0, 0); private final String name; protected final FairScheduler scheduler; private final FSQueueMetrics metrics; @@ -151,7 +152,17 @@ public void setFairShare(Resource fairShare) { this.fairShare = fairShare; metrics.setFairShare(fairShare); } - + + /** Get the steady fair share assigned to this Schedulable. */ + public Resource getSteadyFairShare() { + return steadyFairShare; + } + + public void setSteadyFairShare(Resource steadyFairShare) { + this.steadyFairShare = steadyFairShare; + metrics.setSteadyFairShare(steadyFairShare); + } + public boolean hasAccess(QueueACL acl, UserGroupInformation user) { return scheduler.getAllocationConfiguration().hasAccess(name, acl, user); } @@ -161,7 +172,7 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) { * queue's current share */ public abstract void recomputeShares(); - + /** * Gets the children of this queue, if any. */ @@ -194,7 +205,9 @@ protected boolean assignContainerPreCheck(FSSchedulerNode node) { return true; } - @Override + /** + * Returns true if queue has at least one app running. + */ public boolean isActive() { return getNumRunnableApps() > 0; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java index ff0956e5f74..82c422b8207 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java @@ -33,6 +33,8 @@ public class FSQueueMetrics extends QueueMetrics { @Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB; @Metric("Fair share of CPU in vcores") MutableGaugeInt fairShareVCores; + @Metric("Steady fair share of memory in MB") MutableGaugeInt steadyFairShareMB; + @Metric("Steady fair share of CPU in vcores") MutableGaugeInt steadyFairShareVCores; @Metric("Minimum share of memory in MB") MutableGaugeInt minShareMB; @Metric("Minimum share of CPU in vcores") MutableGaugeInt minShareVCores; @Metric("Maximum share of memory in MB") MutableGaugeInt maxShareMB; @@ -55,7 +57,20 @@ public int getFairShareMB() { public int getFairShareVirtualCores() { return fairShareVCores.value(); } - + + public void setSteadyFairShare(Resource resource) { + steadyFairShareMB.set(resource.getMemory()); + steadyFairShareVCores.set(resource.getVirtualCores()); + } + + public int getSteadyFairShareMB() { + return steadyFairShareMB.value(); + } + + public int getSteadyFairShareVCores() { + return steadyFairShareVCores.value(); + } + public void setMinShare(Resource resource) { minShareMB.set(resource.getMemory()); minShareVCores.set(resource.getVirtualCores()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 0fcbad670e5..40c72a621e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -851,6 +851,8 @@ private synchronized void addNode(RMNode node) { Resources.addTo(clusterResource, node.getTotalCapability()); updateRootQueueMetrics(); + queueMgr.getRootQueue().setSteadyFairShare(clusterResource); + queueMgr.getRootQueue().recomputeSteadyShares(); LOG.info("Added node " + node.getNodeAddress() + " cluster capacity: " + clusterResource); } @@ -885,6 +887,8 @@ private synchronized void removeNode(RMNode rmNode) { } nodes.remove(rmNode.getNodeID()); + queueMgr.getRootQueue().setSteadyFairShare(clusterResource); + queueMgr.getRootQueue().recomputeSteadyShares(); LOG.info("Removed node " + rmNode.getNodeAddress() + " cluster capacity: " + clusterResource); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 4f8735bbf03..490ba686598 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -118,6 +118,11 @@ private FSQueue getQueue(String name, boolean create, FSQueueType queueType) { if (queue == null && create) { // if the queue doesn't exist,create it and return queue = createQueue(name, queueType); + + // Update steady fair share for all queues + if (queue != null) { + rootQueue.recomputeSteadyShares(); + } } return queue; } @@ -190,7 +195,7 @@ private FSQueue createQueue(String name, FSQueueType queueType) { parent = newParent; } } - + return parent; } @@ -376,5 +381,8 @@ public void updateAllocationConfiguration(AllocationConfiguration queueConf) { + queue.getName(), ex); } } + + // Update steady fair shares for all queues + rootQueue.recomputeSteadyShares(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index 122b986defc..289887f63c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.util.resource.Resources; /** * A Schedulable represents an entity that can be scheduled such as an @@ -102,10 +101,4 @@ public interface Schedulable { /** Assign a fair share to this Schedulable. */ public void setFairShare(Resource fairShare); - - /** - * Returns true if queue has atleast one app running. Always returns true for - * AppSchedulables. - */ - public boolean isActive(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java index 1087c73aa19..ca006c580ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java @@ -17,10 +17,6 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; -import java.util.Collection; -import java.util.Comparator; -import java.util.concurrent.ConcurrentHashMap; - import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.util.ReflectionUtils; @@ -29,6 +25,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import java.util.Collection; +import java.util.Comparator; +import java.util.concurrent.ConcurrentHashMap; + @Public @Evolving public abstract class SchedulingPolicy { @@ -131,8 +131,10 @@ public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) { public abstract Comparator getComparator(); /** - * Computes and updates the shares of {@link Schedulable}s as per the - * {@link SchedulingPolicy}, to be used later at schedule time. + * Computes and updates the shares of {@link Schedulable}s as per + * the {@link SchedulingPolicy}, to be used later for scheduling decisions. + * The shares computed are instantaneous and only consider queues with + * running applications. * * @param schedulables {@link Schedulable}s whose shares are to be updated * @param totalResources Total {@link Resource}s in the cluster @@ -140,6 +142,19 @@ public static boolean isApplicableTo(SchedulingPolicy policy, byte depth) { public abstract void computeShares( Collection schedulables, Resource totalResources); + /** + * Computes and updates the steady shares of {@link FSQueue}s as per the + * {@link SchedulingPolicy}. The steady share does not differentiate + * between queues with and without running applications under them. The + * steady share is not used for scheduling, it is displayed on the Web UI + * for better visibility. + * + * @param queues {@link FSQueue}s whose shares are to be updated + * @param totalResources Total {@link Resource}s in the cluster + */ + public abstract void computeSteadyShares( + Collection queues, Resource totalResources); + /** * Check if the resource usage is over the fair share under this policy * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java index 6363ec0218c..6836758019b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; /** @@ -49,14 +50,29 @@ public static void computeShares( ResourceType type) { Collection activeSchedulables = new ArrayList(); for (Schedulable sched : schedulables) { - if (sched.isActive()) { - activeSchedulables.add(sched); - } else { + if ((sched instanceof FSQueue) && !((FSQueue) sched).isActive()) { setResourceValue(0, sched.getFairShare(), type); + } else { + activeSchedulables.add(sched); } } - computeSharesInternal(activeSchedulables, totalResources, type); + computeSharesInternal(activeSchedulables, totalResources, type, false); + } + + /** + * Compute the steady fair share of the given queues. The steady fair + * share is an allocation of shares considering all queues, i.e., + * active and inactive. + * + * @param queues + * @param totalResources + * @param type + */ + public static void computeSteadyShares( + Collection queues, Resource totalResources, + ResourceType type) { + computeSharesInternal(queues, totalResources, type, true); } /** @@ -102,7 +118,7 @@ public static void computeShares( */ private static void computeSharesInternal( Collection schedulables, Resource totalResources, - ResourceType type) { + ResourceType type, boolean isSteadyShare) { if (schedulables.isEmpty()) { return; } @@ -145,7 +161,13 @@ private static void computeSharesInternal( } // Set the fair shares based on the value of R we've converged to for (Schedulable sched : schedulables) { - setResourceValue(computeShare(sched, right, type), sched.getFairShare(), type); + if (isSteadyShare) { + setResourceValue(computeShare(sched, right, type), + ((FSQueue) sched).getSteadyFairShare(), type); + } else { + setResourceValue( + computeShare(sched, right, type), sched.getFairShare(), type); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index af674b96056..42044bcaac1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.util.resource.Resources; @@ -68,6 +69,14 @@ public void computeShares(Collection schedulables, ComputeFairShares.computeShares(schedulables, totalResources, type); } } + + @Override + public void computeSteadyShares(Collection queues, + Resource totalResources) { + for (ResourceType type : ResourceType.values()) { + ComputeFairShares.computeSteadyShares(queues, totalResources, type); + } + } @Override public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index c51852fa9d6..66bb88bf16c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -25,6 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; @@ -119,6 +120,13 @@ public void computeShares(Collection schedulables, ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY); } + @Override + public void computeSteadyShares(Collection queues, + Resource totalResources) { + ComputeFairShares.computeSteadyShares(queues, totalResources, + ResourceType.MEMORY); + } + @Override public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { return Resources.greaterThan(RESOURCE_CALCULATOR, null, usage, fairShare); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 0f4309759d4..591ee4936b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; import org.apache.hadoop.yarn.util.resource.Resources; @@ -87,6 +88,13 @@ public void computeShares(Collection schedulables, earliest.setFairShare(Resources.clone(totalResources)); } + @Override + public void computeSteadyShares(Collection queues, + Resource totalResources) { + // Nothing needs to do, as leaf queue doesn't have to calculate steady + // fair shares for applications. + } + @Override public boolean checkIfUsageOverFairShare(Resource usage, Resource fairShare) { throw new UnsupportedOperationException( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index 5bd52ab7a07..5a170cf2c5a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -100,11 +100,6 @@ public void setFairShare(Resource fairShare) { this.fairShare = fairShare; } - @Override - public boolean isActive() { - return true; - } - @Override public Resource getDemand() { return null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index fb158730082..f4b059f5d4f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -292,14 +292,19 @@ public void testSimpleFairShareCalculation() throws IOException { createSchedulingRequest(10 * 1024, "root.default", "user1"); scheduler.update(); + scheduler.getQueueManager().getRootQueue() + .setSteadyFairShare(scheduler.getClusterResource()); + scheduler.getQueueManager().getRootQueue().recomputeSteadyShares(); Collection queues = scheduler.getQueueManager().getLeafQueues(); assertEquals(3, queues.size()); - // Divided three ways - betwen the two queues and the default queue + // Divided three ways - between the two queues and the default queue for (FSLeafQueue p : queues) { assertEquals(3414, p.getFairShare().getMemory()); assertEquals(3414, p.getMetrics().getFairShareMB()); + assertEquals(3414, p.getSteadyFairShare().getMemory()); + assertEquals(3414, p.getMetrics().getSteadyFairShareMB()); } } @@ -323,6 +328,9 @@ public void testSimpleHierarchicalFairShareCalculation() throws IOException { createSchedulingRequest(10 * 1024, "root.default", "user1"); scheduler.update(); + scheduler.getQueueManager().getRootQueue() + .setSteadyFairShare(scheduler.getClusterResource()); + scheduler.getQueueManager().getRootQueue().recomputeSteadyShares(); QueueManager queueManager = scheduler.getQueueManager(); Collection queues = queueManager.getLeafQueues(); @@ -333,10 +341,16 @@ public void testSimpleHierarchicalFairShareCalculation() throws IOException { FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3", true); assertEquals(capacity / 2, queue1.getFairShare().getMemory()); assertEquals(capacity / 2, queue1.getMetrics().getFairShareMB()); + assertEquals(capacity / 2, queue1.getSteadyFairShare().getMemory()); + assertEquals(capacity / 2, queue1.getMetrics().getSteadyFairShareMB()); assertEquals(capacity / 4, queue2.getFairShare().getMemory()); assertEquals(capacity / 4, queue2.getMetrics().getFairShareMB()); + assertEquals(capacity / 4, queue2.getSteadyFairShare().getMemory()); + assertEquals(capacity / 4, queue2.getMetrics().getSteadyFairShareMB()); assertEquals(capacity / 4, queue3.getFairShare().getMemory()); assertEquals(capacity / 4, queue3.getMetrics().getFairShareMB()); + assertEquals(capacity / 4, queue3.getSteadyFairShare().getMemory()); + assertEquals(capacity / 4, queue3.getMetrics().getSteadyFairShareMB()); } @Test @@ -771,6 +785,9 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception { createSchedulingRequest(10 * 1024, "root.default", "user3"); scheduler.update(); + scheduler.getQueueManager().getRootQueue() + .setSteadyFairShare(scheduler.getClusterResource()); + scheduler.getQueueManager().getRootQueue().recomputeSteadyShares(); Collection leafQueues = scheduler.getQueueManager() .getLeafQueues(); @@ -780,12 +797,128 @@ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception { || leaf.getName().equals("root.parentq.user2")) { // assert that the fair share is 1/4th node1's capacity assertEquals(capacity / 4, leaf.getFairShare().getMemory()); + // assert that the steady fair share is 1/4th node1's capacity + assertEquals(capacity / 4, leaf.getSteadyFairShare().getMemory()); // assert weights are equal for both the user queues assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0); } } } - + + @Test + public void testSteadyFairShareWithReloadAndNodeAddRemove() throws Exception { + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println("fair"); + out.println(""); + out.println(" drf"); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // The steady fair share for all queues should be 0 + QueueManager queueManager = scheduler.getQueueManager(); + assertEquals(0, queueManager.getLeafQueue("child1", false) + .getSteadyFairShare().getMemory()); + assertEquals(0, queueManager.getLeafQueue("child2", false) + .getSteadyFairShare().getMemory()); + + // Add one node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + assertEquals(6144, scheduler.getClusterResource().getMemory()); + + // The steady fair shares for all queues should be updated + assertEquals(2048, queueManager.getLeafQueue("child1", false) + .getSteadyFairShare().getMemory()); + assertEquals(2048, queueManager.getLeafQueue("child2", false) + .getSteadyFairShare().getMemory()); + + // Reload the allocation configuration file + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(""); + out.println(""); + out.println("fair"); + out.println(""); + out.println(" drf"); + out.println(" "); + out.println(" 1"); + out.println(" "); + out.println(" "); + out.println(" 2"); + out.println(" "); + out.println(" "); + out.println(" 2"); + out.println(" "); + out.println(""); + out.println(""); + out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // The steady fair shares for all queues should be updated + assertEquals(1024, queueManager.getLeafQueue("child1", false) + .getSteadyFairShare().getMemory()); + assertEquals(2048, queueManager.getLeafQueue("child2", false) + .getSteadyFairShare().getMemory()); + assertEquals(2048, queueManager.getLeafQueue("child3", false) + .getSteadyFairShare().getMemory()); + + // Remove the node, steady fair shares should back to 0 + NodeRemovedSchedulerEvent nodeEvent2 = new NodeRemovedSchedulerEvent(node1); + scheduler.handle(nodeEvent2); + assertEquals(0, scheduler.getClusterResource().getMemory()); + assertEquals(0, queueManager.getLeafQueue("child1", false) + .getSteadyFairShare().getMemory()); + assertEquals(0, queueManager.getLeafQueue("child2", false) + .getSteadyFairShare().getMemory()); + } + + @Test + public void testSteadyFairShareWithQueueCreatedRuntime() throws Exception { + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "true"); + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add one node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(6144), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + assertEquals(6144, scheduler.getClusterResource().getMemory()); + assertEquals(6144, scheduler.getQueueManager().getRootQueue() + .getSteadyFairShare().getMemory()); + assertEquals(6144, scheduler.getQueueManager() + .getLeafQueue("default", false).getSteadyFairShare().getMemory()); + + // Submit one application + ApplicationAttemptId appAttemptId1 = createAppAttemptId(1, 1); + createApplicationWithAMResource(appAttemptId1, "default", "user1", null); + assertEquals(3072, scheduler.getQueueManager() + .getLeafQueue("default", false).getSteadyFairShare().getMemory()); + assertEquals(3072, scheduler.getQueueManager() + .getLeafQueue("user1", false).getSteadyFairShare().getMemory()); + } + /** * Make allocation requests and ensure they are reflected in queue demand. */ @@ -873,7 +1006,7 @@ public void testAppAdditionAndRemoval() throws Exception { } @Test - public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException, + public void testHierarchicalQueueAllocationFileParsing() throws IOException, SAXException, AllocationConfigurationException, ParserConfigurationException { conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java index 8b8ce93b506..ab8fcbc2b56 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerFairShare.java @@ -109,13 +109,15 @@ public void testFairShareNoAppsRunning() throws IOException { for (FSLeafQueue leaf : leafQueues) { if (leaf.getName().startsWith("root.parentA")) { - assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity - * 100, 0); + assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity, + 0); } else if (leaf.getName().startsWith("root.parentB")) { - assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity - * 100, 0.1); + assertEquals(0, (double) leaf.getFairShare().getMemory() / nodeCapacity, + 0); } } + + verifySteadyFairShareMemory(leafQueues, nodeCapacity); } @Test @@ -135,14 +137,15 @@ public void testFairShareOneAppRunning() throws IOException { 100, (double) scheduler.getQueueManager() .getLeafQueue("root.parentA.childA1", false).getFairShare() - .getMemory() - / nodeCapacity * 100, 0.1); + .getMemory() / nodeCapacity * 100, 0.1); assertEquals( 0, (double) scheduler.getQueueManager() .getLeafQueue("root.parentA.childA2", false).getFairShare() - .getMemory() - / nodeCapacity * 100, 0.1); + .getMemory() / nodeCapacity, 0.1); + + verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(), + nodeCapacity); } @Test @@ -167,6 +170,9 @@ public void testFairShareMultipleActiveQueuesUnderSameParent() .getMemory() / nodeCapacity * 100, .9); } + + verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(), + nodeCapacity); } @Test @@ -206,6 +212,9 @@ public void testFairShareMultipleActiveQueuesUnderDifferentParent() .getLeafQueue("root.parentB.childB1", false).getFairShare() .getMemory() / nodeCapacity * 100, .9); + + verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(), + nodeCapacity); } @Test @@ -253,6 +262,9 @@ public void testFairShareResetsToZeroWhenAppsComplete() throws IOException { .getLeafQueue("root.parentA.childA2", false).getFairShare() .getMemory() / nodeCapacity * 100, 0.1); + + verifySteadyFairShareMemory(scheduler.getQueueManager().getLeafQueues(), + nodeCapacity); } @Test @@ -304,5 +316,45 @@ public void testFairShareWithDRFMultipleActiveQueuesUnderDifferentParent() .getLeafQueue("root.parentB.childB1", false).getFairShare() .getVirtualCores() / nodeVCores * 100, .9); + Collection leafQueues = scheduler.getQueueManager() + .getLeafQueues(); + + for (FSLeafQueue leaf : leafQueues) { + if (leaf.getName().startsWith("root.parentA")) { + assertEquals(0.2, + (double) leaf.getSteadyFairShare().getMemory() / nodeMem, 0.001); + assertEquals(0.2, + (double) leaf.getSteadyFairShare().getVirtualCores() / nodeVCores, + 0.001); + } else if (leaf.getName().startsWith("root.parentB")) { + assertEquals(0.05, + (double) leaf.getSteadyFairShare().getMemory() / nodeMem, 0.001); + assertEquals(0.1, + (double) leaf.getSteadyFairShare().getVirtualCores() / nodeVCores, + 0.001); + } + } + } + + /** + * Verify whether steady fair shares for all leaf queues still follow + * their weight, not related to active/inactive status. + * + * @param leafQueues + * @param nodeCapacity + */ + private void verifySteadyFairShareMemory(Collection leafQueues, + int nodeCapacity) { + for (FSLeafQueue leaf : leafQueues) { + if (leaf.getName().startsWith("root.parentA")) { + assertEquals(0.2, + (double) leaf.getSteadyFairShare().getMemory() / nodeCapacity, + 0.001); + } else if (leaf.getName().startsWith("root.parentB")) { + assertEquals(0.05, + (double) leaf.getSteadyFairShare().getMemory() / nodeCapacity, + 0.001); + } + } } }