From 620f7b0c02791c02feb8cbc347d5f4ca3d47e62b Mon Sep 17 00:00:00 2001 From: Alejandro Abdelnur Date: Mon, 24 Jun 2013 18:35:59 +0000 Subject: [PATCH] YARN-736. Add a multi-resource fair sharing metric. (sandyr via tucu) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1496154 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 2 + .../hadoop/yarn/util/resource/Resources.java | 37 ++++ .../scheduler/fair/AppSchedulable.java | 7 +- .../scheduler/fair/FSQueue.java | 5 + .../scheduler/fair/FairScheduler.java | 2 +- .../scheduler/fair/Schedulable.java | 2 + .../fair/policies/ComputeFairShares.java | 172 ++++++++++++++++++ .../DominantResourceFairnessPolicy.java | 9 +- .../fair/policies/FairSharePolicy.java | 115 +----------- .../scheduler/fair/policies/FifoPolicy.java | 9 +- .../scheduler/fair/FakeSchedulable.java | 46 +++-- .../scheduler/fair/TestComputeFairShares.java | 167 +++++++++-------- .../scheduler/fair/TestFairScheduler.java | 13 +- .../TestDominantResourceFairnessPolicy.java | 5 +- .../src/site/apt/FairScheduler.apt.vm | 36 ++++ 15 files changed, 401 insertions(+), 226 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2079365726e..38eabd239e5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -22,6 +22,8 @@ Release 2.2.0 - UNRELEASED YARN-866. Add test for class ResourceWeights. (ywskycn via tucu) + YARN-736. Add a multi-resource fair sharing metric. (sandyr via tucu) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java index 8d299da1eeb..800fa34e570 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java @@ -60,6 +60,39 @@ public class Resources { } }; + + private static final Resource UNBOUNDED = new Resource() { + + @Override + public int getMemory() { + return Integer.MAX_VALUE; + } + + @Override + public void setMemory(int memory) { + throw new RuntimeException("NONE cannot be modified!"); + } + + @Override + public int getVirtualCores() { + return Integer.MAX_VALUE; + } + + @Override + public void setVirtualCores(int cores) { + throw new RuntimeException("NONE cannot be modified!"); + } + + @Override + public int compareTo(Resource o) { + int diff = 0 - o.getMemory(); + if (diff == 0) { + diff = 0 - o.getVirtualCores(); + } + return diff; + } + + }; public static Resource createResource(int memory) { return createResource(memory, (memory > 0) ? 1 : 0); @@ -75,6 +108,10 @@ public class Resources { public static Resource none() { return NONE; } + + public static Resource unbounded() { + return UNBOUNDED; + } public static Resource clone(Resource res) { return createResource(res.getMemory(), res.getVirtualCores()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java index 0844b999b0f..c6c1d2bc826 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java @@ -109,7 +109,12 @@ public class AppSchedulable extends Schedulable { @Override public Resource getMinShare() { - return Resources.createResource(0); + return Resources.none(); + } + + @Override + public Resource getMaxShare() { + return Resources.unbounded(); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java index f86d9ef574f..4d8a8df1f13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java @@ -93,6 +93,11 @@ public abstract class FSQueue extends Schedulable implements Queue { public Resource getMinShare() { return queueMgr.getMinResources(getName()); } + + @Override + public Resource getMaxShare() { + return queueMgr.getMaxResources(getName()); + } @Override public long getStartTime() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index b23b1762cdd..fd1b12276db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -311,7 +311,7 @@ public class FairScheduler implements ResourceScheduler { * defined as being below half its fair share. */ boolean isStarvedForFairShare(FSLeafQueue sched) { - Resource desiredFairShare = Resources.max(RESOURCE_CALCULATOR, clusterCapacity, + Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterCapacity, Resources.multiply(sched.getFairShare(), .5), sched.getDemand()); return Resources.lessThan(RESOURCE_CALCULATOR, clusterCapacity, sched.getResourceUsage(), desiredFairShare); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java index 9514a240519..92b6d3e71ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java @@ -79,6 +79,8 @@ public abstract class Schedulable { /** Minimum Resource share assigned to the schedulable. */ public abstract Resource getMinShare(); + /** Maximum Resource share assigned to the schedulable. */ + public abstract Resource getMaxShare(); /** Job/queue weight in fair sharing. */ public abstract ResourceWeights getWeights(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java new file mode 100644 index 00000000000..c8fd450aa06 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/ComputeFairShares.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies; + +import java.util.Collection; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable; + +/** + * Contains logic for computing the fair shares. A {@link Schedulable}'s fair + * share is {@link Resource} it is entitled to, independent of the current + * demands and allocations on the cluster. A {@link Schedulable} whose resource + * consumption lies at or below its fair share will never have its containers + * preempted. + */ +public class ComputeFairShares { + + private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25; + + /** + * Given a set of Schedulables and a number of slots, compute their weighted + * fair shares. The min and max shares and of the Schedulables are assumed to + * be set beforehand. We compute the fairest possible allocation of shares to + * the Schedulables that respects their min and max shares. + * + * To understand what this method does, we must first define what weighted + * fair sharing means in the presence of min and max shares. If there + * were no minimum or maximum shares, then weighted fair sharing would be + * achieved if the ratio of slotsAssigned / weight was equal for each + * Schedulable and all slots were assigned. Minimum and maximum shares add a + * further twist - Some Schedulables may have a min share higher than their + * assigned share or a max share lower than their assigned share. + * + * To deal with these possibilities, we define an assignment of slots as being + * fair if there exists a ratio R such that: Schedulables S where S.minShare + * > R * S.weight are given share S.minShare - Schedulables S where S.maxShare + * < R * S.weight are given S.maxShare - All other Schedulables S are + * assigned share R * S.weight - The sum of all the shares is totalSlots. + * + * We call R the weight-to-slots ratio because it converts a Schedulable's + * weight to the number of slots it is assigned. + * + * We compute a fair allocation by finding a suitable weight-to-slot ratio R. + * To do this, we use binary search. Given a ratio R, we compute the number of + * slots that would be used in total with this ratio (the sum of the shares + * computed using the conditions above). If this number of slots is less than + * totalSlots, then R is too small and more slots could be assigned. If the + * number of slots is more than totalSlots, then R is too large. + * + * We begin the binary search with a lower bound on R of 0 (which means that + * all Schedulables are only given their minShare) and an upper bound computed + * to be large enough that too many slots are given (by doubling R until we + * use more than totalResources resources). The helper method + * resourceUsedWithWeightToResourceRatio computes the total resources used with a + * given value of R. + * + * The running time of this algorithm is linear in the number of Schedulables, + * because resourceUsedWithWeightToResourceRatio is linear-time and the number of + * iterations of binary search is a constant (dependent on desired precision). + */ + public static void computeShares( + Collection schedulables, Resource totalResources, + ResourceType type) { + if (schedulables.isEmpty()) { + return; + } + // Find an upper bound on R that we can use in our binary search. We start + // at R = 1 and double it until we have either used all the resources or we + // have met all Schedulables' max shares. + int totalMaxShare = 0; + for (Schedulable sched : schedulables) { + int maxShare = getResourceValue(sched.getMaxShare(), type); + if (maxShare == Integer.MAX_VALUE) { + totalMaxShare = Integer.MAX_VALUE; + break; + } else { + totalMaxShare += maxShare; + } + } + int totalResource = Math.min(totalMaxShare, + getResourceValue(totalResources, type)); + + double rMax = 1.0; + while (resourceUsedWithWeightToResourceRatio(rMax, schedulables, type) + < totalResource) { + rMax *= 2.0; + } + // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps + double left = 0; + double right = rMax; + for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) { + double mid = (left + right) / 2.0; + if (resourceUsedWithWeightToResourceRatio(mid, schedulables, type) < + totalResource) { + left = mid; + } else { + right = mid; + } + } + // Set the fair shares based on the value of R we've converged to + for (Schedulable sched : schedulables) { + setResourceValue(computeShare(sched, right, type), sched.getFairShare(), type); + } + } + + /** + * Compute the resources that would be used given a weight-to-resource ratio + * w2rRatio, for use in the computeFairShares algorithm as described in # + */ + private static int resourceUsedWithWeightToResourceRatio(double w2rRatio, + Collection schedulables, ResourceType type) { + int resourcesTaken = 0; + for (Schedulable sched : schedulables) { + int share = computeShare(sched, w2rRatio, type); + resourcesTaken += share; + } + return resourcesTaken; + } + + /** + * Compute the resources assigned to a Schedulable given a particular + * weight-to-resource ratio w2rRatio. + */ + private static int computeShare(Schedulable sched, double w2rRatio, + ResourceType type) { + double share = sched.getWeights().getWeight(type) * w2rRatio; + share = Math.max(share, getResourceValue(sched.getMinShare(), type)); + share = Math.min(share, getResourceValue(sched.getMaxShare(), type)); + return (int) share; + } + + private static int getResourceValue(Resource resource, ResourceType type) { + switch (type) { + case MEMORY: + return resource.getMemory(); + case CPU: + return resource.getVirtualCores(); + default: + throw new IllegalArgumentException("Invalid resource"); + } + } + + private static void setResourceValue(int val, Resource resource, ResourceType type) { + switch (type) { + case MEMORY: + resource.setMemory(val); + break; + case CPU: + resource.setVirtualCores(val); + break; + default: + throw new IllegalArgumentException("Invalid resource"); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java index 9c1178fd1d2..f5b84177229 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/DominantResourceFairnessPolicy.java @@ -64,13 +64,8 @@ public class DominantResourceFairnessPolicy extends SchedulingPolicy { @Override public void computeShares(Collection schedulables, Resource totalResources) { - - // TODO: For now, set all fair shares to 0, because, in the context of DRF, - // it doesn't make sense to set a value for each resource. YARN-736 should - // add in a sensible replacement. - - for (Schedulable schedulable : schedulables) { - schedulable.setFairShare(Resources.none()); + for (ResourceType type : ResourceType.values()) { + ComputeFairShares.computeShares(schedulables, totalResources, type); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index fec5e96dcfe..fbad1012676 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -116,120 +116,7 @@ public class FairSharePolicy extends SchedulingPolicy { @Override public void computeShares(Collection schedulables, Resource totalResources) { - computeFairShares(schedulables, totalResources); - } - - /** - * Number of iterations for the binary search in computeFairShares. This is - * equivalent to the number of bits of precision in the output. 25 iterations - * gives precision better than 0.1 slots in clusters with one million slots. - */ - private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25; - - /** - * Given a set of Schedulables and a number of slots, compute their weighted - * fair shares. The min shares and demands of the Schedulables are assumed to - * be set beforehand. We compute the fairest possible allocation of shares to - * the Schedulables that respects their min shares and demands. - * - * To understand what this method does, we must first define what weighted - * fair sharing means in the presence of minimum shares and demands. If there - * were no minimum shares and every Schedulable had an infinite demand (i.e. - * could launch infinitely many tasks), then weighted fair sharing would be - * achieved if the ratio of slotsAssigned / weight was equal for each - * Schedulable and all slots were assigned. Minimum shares and demands add two - * further twists: - Some Schedulables may not have enough tasks to fill all - * their share. - Some Schedulables may have a min share higher than their - * assigned share. - * - * To deal with these possibilities, we define an assignment of slots as being - * fair if there exists a ratio R such that: - Schedulables S where S.demand < - * R * S.weight are assigned share S.demand - Schedulables S where S.minShare - * > R * S.weight are given share S.minShare - All other Schedulables S are - * assigned share R * S.weight - The sum of all the shares is totalSlots. - * - * We call R the weight-to-slots ratio because it converts a Schedulable's - * weight to the number of slots it is assigned. - * - * We compute a fair allocation by finding a suitable weight-to-slot ratio R. - * To do this, we use binary search. Given a ratio R, we compute the number of - * slots that would be used in total with this ratio (the sum of the shares - * computed using the conditions above). If this number of slots is less than - * totalSlots, then R is too small and more slots could be assigned. If the - * number of slots is more than totalSlots, then R is too large. - * - * We begin the binary search with a lower bound on R of 0 (which means that - * all Schedulables are only given their minShare) and an upper bound computed - * to be large enough that too many slots are given (by doubling R until we - * either use more than totalSlots slots or we fulfill all jobs' demands). The - * helper method slotsUsedWithWeightToSlotRatio computes the total number of - * slots used with a given value of R. - * - * The running time of this algorithm is linear in the number of Schedulables, - * because slotsUsedWithWeightToSlotRatio is linear-time and the number of - * iterations of binary search is a constant (dependent on desired precision). - */ - public static void computeFairShares( - Collection schedulables, Resource totalResources) { - // Find an upper bound on R that we can use in our binary search. We start - // at R = 1 and double it until we have either used totalSlots slots or we - // have met all Schedulables' demands (if total demand < totalSlots). - Resource totalDemand = Resources.createResource(0); - for (Schedulable sched : schedulables) { - Resources.addTo(totalDemand, sched.getDemand()); - } - Resource cap = Resources.min(RESOURCE_CALCULATOR, null, totalDemand, - totalResources); - double rMax = 1.0; - while (Resources.lessThan(RESOURCE_CALCULATOR, null, - resUsedWithWeightToResRatio(rMax, schedulables), - cap)) { - rMax *= 2.0; - } - // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps - double left = 0; - double right = rMax; - for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) { - double mid = (left + right) / 2.0; - if (Resources.lessThan(RESOURCE_CALCULATOR, null, - resUsedWithWeightToResRatio(mid, schedulables), - cap)) { - left = mid; - } else { - right = mid; - } - } - // Set the fair shares based on the value of R we've converged to - for (Schedulable sched : schedulables) { - sched.setFairShare(computeShare(sched, right)); - } - } - - /** - * Compute the number of slots that would be used given a weight-to-slot ratio - * w2sRatio, for use in the computeFairShares algorithm as described in # - * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}. - */ - private static Resource resUsedWithWeightToResRatio(double w2sRatio, - Collection schedulables) { - Resource slotsTaken = Resources.createResource(0); - for (Schedulable sched : schedulables) { - Resource share = computeShare(sched, w2sRatio); - Resources.addTo(slotsTaken, share); - } - return slotsTaken; - } - - /** - * Compute the resources assigned to a Schedulable given a particular - * res-to-slot ratio r2sRatio, for use in computeFairShares as described in # - * {@link SchedulingAlgorithms#computeFairShares(Collection, double)}. - */ - private static Resource computeShare(Schedulable sched, double r2sRatio) { - double share = sched.getWeights().getWeight(ResourceType.MEMORY) * r2sRatio; - share = Math.max(share, sched.getMinShare().getMemory()); - share = Math.min(share, sched.getDemand().getMemory()); - return Resources.createResource((int) share); + ComputeFairShares.computeShares(schedulables, totalResources, ResourceType.MEMORY); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java index 8bc8ae90a35..90e1e6242f7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FifoPolicy.java @@ -73,9 +73,14 @@ public class FifoPolicy extends SchedulingPolicy { @Override public void computeShares(Collection schedulables, Resource totalResources) { - for (Schedulable sched : schedulables) { - sched.setFairShare(Resources.createResource(0)); + Schedulable earliest = null; + for (Schedulable schedulable : schedulables) { + if (earliest == null || + schedulable.getStartTime() < earliest.getStartTime()) { + earliest = schedulable; + } } + earliest.setFairShare(Resources.clone(totalResources)); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java index 0af8cf97025..d0ba0d8e085 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java @@ -28,40 +28,49 @@ import org.apache.hadoop.yarn.util.resource.Resources; * Dummy implementation of Schedulable for unit testing. */ public class FakeSchedulable extends Schedulable { - private Resource demand; private Resource usage; private Resource minShare; + private Resource maxShare; private ResourceWeights weights; private Priority priority; private long startTime; public FakeSchedulable() { - this(0, 0, 1, 0, 0, 0); + this(0, Integer.MAX_VALUE, 1, 0, 0, 0); } - public FakeSchedulable(int demand) { - this(demand, 0, 1, 0, 0, 0); + public FakeSchedulable(int minShare) { + this(minShare, Integer.MAX_VALUE, 1, 0, 0, 0); } - public FakeSchedulable(int demand, int minShare) { - this(demand, minShare, 1, 0, 0, 0); + public FakeSchedulable(int minShare, int maxShare) { + this(minShare, maxShare, 1, 0, 0, 0); } - public FakeSchedulable(int demand, int minShare, double memoryWeight) { - this(demand, minShare, memoryWeight, 0, 0, 0); + public FakeSchedulable(int minShare, double memoryWeight) { + this(minShare, Integer.MAX_VALUE, memoryWeight, 0, 0, 0); } - public FakeSchedulable(int demand, int minShare, double weight, int fairShare, int usage, + public FakeSchedulable(int minShare, int maxShare, double memoryWeight) { + this(minShare, maxShare, memoryWeight, 0, 0, 0); + } + + public FakeSchedulable(int minShare, int maxShare, double weight, int fairShare, int usage, long startTime) { - this(Resources.createResource(demand), Resources.createResource(minShare), - new ResourceWeights((float)weight), Resources.createResource(fairShare), - Resources.createResource(usage), startTime); + this(Resources.createResource(minShare, 0), Resources.createResource(maxShare, 0), + new ResourceWeights((float)weight), Resources.createResource(fairShare, 0), + Resources.createResource(usage, 0), startTime); } - public FakeSchedulable(Resource demand, Resource minShare, ResourceWeights weight, - Resource fairShare, Resource usage, long startTime) { - this.demand = demand; + public FakeSchedulable(Resource minShare, ResourceWeights weights) { + this(minShare, Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE), + weights, Resources.createResource(0, 0), Resources.createResource(0, 0), 0); + } + + public FakeSchedulable(Resource minShare, Resource maxShare, + ResourceWeights weight, Resource fairShare, Resource usage, long startTime) { this.minShare = minShare; + this.maxShare = maxShare; this.weights = weight; setFairShare(fairShare); this.usage = usage; @@ -76,7 +85,7 @@ public class FakeSchedulable extends Schedulable { @Override public Resource getDemand() { - return demand; + return null; } @Override @@ -108,6 +117,11 @@ public class FakeSchedulable extends Schedulable { public Resource getMinShare() { return minShare; } + + @Override + public Resource getMaxShare() { + return maxShare; + } @Override public void updateDemand() {} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java index 219f625b3de..049ba55d096 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestComputeFairShares.java @@ -23,8 +23,10 @@ import java.util.List; import junit.framework.Assert; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.util.resource.Resources; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.ComputeFairShares; import org.junit.Before; import org.junit.Test; @@ -33,12 +35,10 @@ import org.junit.Test; */ public class TestComputeFairShares { private List scheds; - private SchedulingPolicy schedulingMode; @Before public void setUp() throws Exception { scheds = new ArrayList(); - schedulingMode = new FairSharePolicy(); } /** @@ -47,13 +47,13 @@ public class TestComputeFairShares { */ @Test public void testEqualSharing() { - scheds.add(new FakeSchedulable(100)); - scheds.add(new FakeSchedulable(50)); - scheds.add(new FakeSchedulable(30)); - scheds.add(new FakeSchedulable(20)); - schedulingMode.computeShares(scheds, - Resources.createResource(40)); - verifyShares(10, 10, 10, 10); + scheds.add(new FakeSchedulable()); + scheds.add(new FakeSchedulable()); + scheds.add(new FakeSchedulable()); + scheds.add(new FakeSchedulable()); + ComputeFairShares.computeShares(scheds, + Resources.createResource(40), ResourceType.MEMORY); + verifyMemoryShares(10, 10, 10, 10); } /** @@ -64,16 +64,17 @@ public class TestComputeFairShares { * so it only gets 11 slots. Pools 1 and 2 split the rest and get 13 each. */ @Test - public void testLowDemands() { - scheds.add(new FakeSchedulable(100)); - scheds.add(new FakeSchedulable(50)); - scheds.add(new FakeSchedulable(11)); - scheds.add(new FakeSchedulable(3)); - schedulingMode.computeShares(scheds, - Resources.createResource(40)); - verifyShares(13, 13, 11, 3); + public void testLowMaxShares() { + scheds.add(new FakeSchedulable(0, 100)); + scheds.add(new FakeSchedulable(0, 50)); + scheds.add(new FakeSchedulable(0, 11)); + scheds.add(new FakeSchedulable(0, 3)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(40), ResourceType.MEMORY); + verifyMemoryShares(13, 13, 11, 3); } - + + /** * In this test, some pools have minimum shares set. Pool 1 has a min share * of 20 so it gets 20 slots. Pool 2 also has a min share of 20, but its @@ -84,13 +85,13 @@ public class TestComputeFairShares { */ @Test public void testMinShares() { - scheds.add(new FakeSchedulable(100, 20)); - scheds.add(new FakeSchedulable(10, 20)); - scheds.add(new FakeSchedulable(10, 0)); - scheds.add(new FakeSchedulable(3, 2)); - schedulingMode.computeShares(scheds, - Resources.createResource(40)); - verifyShares(20, 10, 7, 3); + scheds.add(new FakeSchedulable(20)); + scheds.add(new FakeSchedulable(18)); + scheds.add(new FakeSchedulable(0)); + scheds.add(new FakeSchedulable(2)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(40), ResourceType.MEMORY); + verifyMemoryShares(20, 18, 0, 2); } /** @@ -99,15 +100,15 @@ public class TestComputeFairShares { */ @Test public void testWeightedSharing() { - scheds.add(new FakeSchedulable(100, 0, 2.0)); - scheds.add(new FakeSchedulable(50, 0, 1.0)); - scheds.add(new FakeSchedulable(30, 0, 1.0)); - scheds.add(new FakeSchedulable(20, 0, 0.5)); - schedulingMode.computeShares(scheds, - Resources.createResource(45)); - verifyShares(20, 10, 10, 5); + scheds.add(new FakeSchedulable(0, 2.0)); + scheds.add(new FakeSchedulable(0, 1.0)); + scheds.add(new FakeSchedulable(0, 1.0)); + scheds.add(new FakeSchedulable(0, 0.5)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(45), ResourceType.MEMORY); + verifyMemoryShares(20, 10, 10, 5); } - + /** * Weighted sharing test where pools 1 and 2 are now given lower demands than * above. Pool 1 stops at 10 slots, leaving 35. If the remaining pools split @@ -116,16 +117,17 @@ public class TestComputeFairShares { * the 24 slots left into a 1:0.5 ratio, getting 16 and 8 slots respectively. */ @Test - public void testWeightedSharingWithLowDemands() { - scheds.add(new FakeSchedulable(10, 0, 2.0)); - scheds.add(new FakeSchedulable(11, 0, 1.0)); - scheds.add(new FakeSchedulable(30, 0, 1.0)); - scheds.add(new FakeSchedulable(20, 0, 0.5)); - schedulingMode.computeShares(scheds, - Resources.createResource(45)); - verifyShares(10, 11, 16, 8); + public void testWeightedSharingWithMaxShares() { + scheds.add(new FakeSchedulable(0, 10, 2.0)); + scheds.add(new FakeSchedulable(0, 11, 1.0)); + scheds.add(new FakeSchedulable(0, 30, 1.0)); + scheds.add(new FakeSchedulable(0, 20, 0.5)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(45), ResourceType.MEMORY); + verifyMemoryShares(10, 11, 16, 8); } + /** * Weighted fair sharing test with min shares. As in the min share test above, * pool 1 has a min share greater than its demand so it only gets its demand. @@ -135,13 +137,13 @@ public class TestComputeFairShares { */ @Test public void testWeightedSharingWithMinShares() { - scheds.add(new FakeSchedulable(10, 20, 2.0)); - scheds.add(new FakeSchedulable(11, 0, 1.0)); - scheds.add(new FakeSchedulable(30, 5, 1.0)); - scheds.add(new FakeSchedulable(20, 15, 0.5)); - schedulingMode.computeShares(scheds, - Resources.createResource(45)); - verifyShares(10, 10, 10, 15); + scheds.add(new FakeSchedulable(20, 2.0)); + scheds.add(new FakeSchedulable(0, 1.0)); + scheds.add(new FakeSchedulable(5, 1.0)); + scheds.add(new FakeSchedulable(15, 0.5)); + ComputeFairShares.computeShares(scheds, + Resources.createResource(45), ResourceType.MEMORY); + verifyMemoryShares(20, 5, 5, 15); } /** @@ -151,28 +153,13 @@ public class TestComputeFairShares { @Test public void testLargeShares() { int million = 1000 * 1000; - scheds.add(new FakeSchedulable(100 * million)); - scheds.add(new FakeSchedulable(50 * million)); - scheds.add(new FakeSchedulable(30 * million)); - scheds.add(new FakeSchedulable(20 * million)); - schedulingMode - .computeShares(scheds, - Resources.createResource(40 * million)); - verifyShares(10 * million, 10 * million, 10 * million, 10 * million); - } - - /** - * Test that having a pool with 0 demand doesn't confuse the algorithm. - */ - @Test - public void testZeroDemand() { - scheds.add(new FakeSchedulable(100)); - scheds.add(new FakeSchedulable(50)); - scheds.add(new FakeSchedulable(30)); - scheds.add(new FakeSchedulable(0)); - schedulingMode.computeShares(scheds, - Resources.createResource(30)); - verifyShares(10, 10, 10, 0); + scheds.add(new FakeSchedulable()); + scheds.add(new FakeSchedulable()); + scheds.add(new FakeSchedulable()); + scheds.add(new FakeSchedulable()); + ComputeFairShares.computeShares(scheds, + Resources.createResource(40 * million), ResourceType.MEMORY); + verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million); } /** @@ -180,18 +167,46 @@ public class TestComputeFairShares { */ @Test public void testEmptyList() { - schedulingMode.computeShares(scheds, - Resources.createResource(40)); - verifyShares(); + ComputeFairShares.computeShares(scheds, + Resources.createResource(40), ResourceType.MEMORY); + verifyMemoryShares(); + } + + /** + * Test that CPU works as well as memory + */ + @Test + public void testCPU() { + scheds.add(new FakeSchedulable(Resources.createResource(0, 20), + new ResourceWeights(2.0f))); + scheds.add(new FakeSchedulable(Resources.createResource(0, 0), + new ResourceWeights(1.0f))); + scheds.add(new FakeSchedulable(Resources.createResource(0, 5), + new ResourceWeights(1.0f))); + scheds.add(new FakeSchedulable(Resources.createResource(0, 15), + new ResourceWeights(0.5f))); + ComputeFairShares.computeShares(scheds, + Resources.createResource(0, 45), ResourceType.CPU); + verifyCPUShares(20, 5, 5, 15); } /** * Check that a given list of shares have been assigned to this.scheds. */ - private void verifyShares(double... shares) { + private void verifyMemoryShares(int... shares) { Assert.assertEquals(scheds.size(), shares.length); for (int i = 0; i < shares.length; i++) { - Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getMemory(), 0.01); + Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getMemory()); + } + } + + /** + * Check that a given list of shares have been assigned to this.scheds. + */ + private void verifyCPUShares(int... shares) { + Assert.assertEquals(scheds.size(), shares.length); + for (int i = 0; i < shares.length; i++) { + Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getVirtualCores()); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index 3f1a5fbf3a3..afd2cab6cdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -374,11 +374,10 @@ public class TestFairScheduler { Collection queues = scheduler.getQueueManager().getLeafQueues(); assertEquals(3, queues.size()); - + + // Divided three ways - betwen the two queues and the default queue for (FSLeafQueue p : queues) { - if (!p.getName().equals("root.default")) { - assertEquals(5120, p.getFairShare().getMemory()); - } + assertEquals(3414, p.getFairShare().getMemory()); } } @@ -393,7 +392,7 @@ public class TestFairScheduler { scheduler.handle(nodeEvent1); // Have two queues which want entire cluster capacity - createSchedulingRequest(10 * 1024, "queue1", "user1"); + createSchedulingRequest(10 * 1024, "default", "user1"); createSchedulingRequest(10 * 1024, "parent.queue2", "user1"); createSchedulingRequest(10 * 1024, "parent.queue3", "user1"); @@ -401,9 +400,9 @@ public class TestFairScheduler { QueueManager queueManager = scheduler.getQueueManager(); Collection queues = queueManager.getLeafQueues(); - assertEquals(4, queues.size()); + assertEquals(3, queues.size()); - FSLeafQueue queue1 = queueManager.getLeafQueue("queue1"); + FSLeafQueue queue1 = queueManager.getLeafQueue("default"); FSLeafQueue queue2 = queueManager.getLeafQueue("parent.queue2"); FSLeafQueue queue3 = queueManager.getLeafQueue("parent.queue3"); assertEquals(capacity / 2, queue1.getFairShare().getMemory()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java index a5088fc40d3..a5c20c1b050 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/TestDominantResourceFairnessPolicy.java @@ -64,8 +64,9 @@ public class TestDominantResourceFairnessPolicy { ResourceWeights weights, int minMemShare, int minCpuShare) { Resource usage = BuilderUtils.newResource(memUsage, cpuUsage); Resource minShare = BuilderUtils.newResource(minMemShare, minCpuShare); - return new FakeSchedulable(Resources.none(), minShare, weights, - Resources.none(), usage, 0l); + return new FakeSchedulable(minShare, + Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE), + weights, Resources.none(), usage, 0l); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm index 96c55f21329..a5adb4e838c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm @@ -257,3 +257,39 @@ Allocation file format --- Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements. + +* {Administration} + + The fair scheduler provides support for administration at runtime through two mechanisms: + + * It is possible to modify minimum shares, limits, weights, preemption timeouts + and queue scheduling policies at runtime by editing the allocation file. The + scheduler will reload this file 10-15 seconds after it sees that it was + modified. + + * Current applications, queues, and fair shares can be examined through the + ResourceManager's web interface, at + http:///cluster/scheduler. + + The following fields can be seen for each queue on the web interface: + + * Used Resources - The sum of resources allocated to containers within the queue. + + * Num Active Applications - The number of applications in the queue that have + received at least one container. + + * Num Pending Applications - The number of applications in the queue that have + not yet received any containers. + + * Min Resources - The configured minimum resources that are guaranteed to the queue. + + * Max Resources - The configured maximum resources that are allowed to the queue. + + * Fair Share - The queue's fair share of resources. Queues may be allocated + resources beyond their fair share when other queues aren't using them. A + queue whose resource consumption lies at or below its fair share will never + have its containers preempted. + + In addition to the information that the ResourceManager normally displays + about each application, the web interface includes the application's fair share. +