From f81a4efb8c40f99a9a6b7b42d3b6eeedf43eb27a Mon Sep 17 00:00:00 2001 From: Daniel Templeton Date: Tue, 25 Jul 2017 13:00:31 -0700 Subject: [PATCH] YARN-6307. Refactor FairShareComparator#compare (Contributed by Yufei Gu via Daniel Templeton) --- .../fair/policies/FairSharePolicy.java | 125 ++++++++++++------ 1 file changed, 81 insertions(+), 44 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 2a852aa11b9..0ef90a1d72f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -56,24 +56,28 @@ public class FairSharePolicy extends SchedulingPolicy { } /** - * Compare Schedulables via weighted fair sharing. In addition, Schedulables - * below their min share get priority over those whose min share is met. + * Compare Schedulables mainly via fair share usage to meet fairness. + * Specifically, it goes through following four steps. * - * Schedulables without resource demand get lower priority than - * ones who have demands. + * 1. Compare demands. Schedulables without resource demand get lower priority + * than ones who have demands. * - * Schedulables below their min share are compared by how far below it they - * are as a ratio. For example, if job A has 8 out of a min share of 10 tasks - * and job B has 50 out of a min share of 100, then job B is scheduled next, - * because B is at 50% of its min share and A is at 80% of its min share. + * 2. Compare min share usage. Schedulables below their min share are compared + * by how far below it they are as a ratio. For example, if job A has 8 out + * of a min share of 10 tasks and job B has 50 out of a min share of 100, + * then job B is scheduled next, because B is at 50% of its min share and A + * is at 80% of its min share. * - * Schedulables above their min share are compared by (runningTasks / weight). + * 3. Compare fair share usage. Schedulables above their min share are + * compared by fair share usage by checking (resource usage / weight). * If all weights are equal, slots are given to the job with the fewest tasks; * otherwise, jobs with more weight get proportionally more slots. If weight * equals to 0, we can't compare Schedulables by (resource usage/weight). * There are two situations: 1)All weights equal to 0, slots are given * to one with less resource usage. 2)Only one of weight equals to 0, slots * are given to the one with non-zero weight. + * + * 4. Break the tie by compare submit time and job name. */ private static class FairShareComparator implements Comparator, Serializable { @@ -82,37 +86,88 @@ public class FairSharePolicy extends SchedulingPolicy { @Override public int compare(Schedulable s1, Schedulable s2) { + int res = compareDemand(s1, s2); + + // Pre-compute resource usages to avoid duplicate calculation + Resource resourceUsage1 = s1.getResourceUsage(); + Resource resourceUsage2 = s2.getResourceUsage(); + + if (res == 0) { + res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2); + } + + if (res == 0) { + res = compareFairShareUsage(s1, s2, resourceUsage1, resourceUsage2); + } + + // Break the tie by submit time + if (res == 0) { + res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); + } + + // Break the tie by job name + if (res == 0) { + res = s1.getName().compareTo(s2.getName()); + } + + return res; + } + + private int compareDemand(Schedulable s1, Schedulable s2) { + int res = 0; Resource demand1 = s1.getDemand(); Resource demand2 = s2.getDemand(); if (demand1.equals(Resources.none()) && Resources.greaterThan( RESOURCE_CALCULATOR, null, demand2, Resources.none())) { - return 1; + res = 1; } else if (demand2.equals(Resources.none()) && Resources.greaterThan( RESOURCE_CALCULATOR, null, demand1, Resources.none())) { - return -1; + res = -1; } + return res; + } - double minShareRatio1, minShareRatio2; - double useToWeightRatio1, useToWeightRatio2; - double weight1, weight2; - //Do not repeat the getResourceUsage calculation - Resource resourceUsage1 = s1.getResourceUsage(); - Resource resourceUsage2 = s2.getResourceUsage(); + private int compareMinShareUsage(Schedulable s1, Schedulable s2, + Resource resourceUsage1, Resource resourceUsage2) { + int res; Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null, - s1.getMinShare(), demand1); + s1.getMinShare(), s1.getDemand()); Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null, - s2.getMinShare(), demand2); + s2.getMinShare(), s2.getDemand()); boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, resourceUsage1, minShare1); boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, resourceUsage2, minShare2); - minShareRatio1 = (double) resourceUsage1.getMemorySize() - / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize(); - minShareRatio2 = (double) resourceUsage2.getMemorySize() - / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize(); - weight1 = s1.getWeights().getWeight(ResourceType.MEMORY); - weight2 = s2.getWeights().getWeight(ResourceType.MEMORY); + if (s1Needy && !s2Needy) { + res = -1; + } else if (s2Needy && !s1Needy) { + res = 1; + } else if (s1Needy && s2Needy) { + double minShareRatio1 = (double) resourceUsage1.getMemorySize() / + Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE) + .getMemorySize(); + double minShareRatio2 = (double) resourceUsage2.getMemorySize() / + Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE) + .getMemorySize(); + res = (int) Math.signum(minShareRatio1 - minShareRatio2); + } else { + res = 0; + } + + return res; + } + + /** + * To simplify computation, use weights instead of fair shares to calculate + * fair share usage. + */ + private int compareFairShareUsage(Schedulable s1, Schedulable s2, + Resource resourceUsage1, Resource resourceUsage2) { + double weight1 = s1.getWeights().getWeight(ResourceType.MEMORY); + double weight2 = s2.getWeights().getWeight(ResourceType.MEMORY); + double useToWeightRatio1; + double useToWeightRatio2; if (weight1 > 0.0 && weight2 > 0.0) { useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1; useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2; @@ -130,25 +185,7 @@ public class FairSharePolicy extends SchedulingPolicy { } } - int res = 0; - if (s1Needy && !s2Needy) - res = -1; - else if (s2Needy && !s1Needy) - res = 1; - else if (s1Needy && s2Needy) - res = (int) Math.signum(minShareRatio1 - minShareRatio2); - else - // Neither schedulable is needy - res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2); - if (res == 0) { - // Apps are tied in fairness ratio. Break the tie by submit time and job - // name to get a deterministic ordering, which is useful for unit tests. - res = (int) Math.signum(s1.getStartTime() - s2.getStartTime()); - if (res == 0) { - res = s1.getName().compareTo(s2.getName()); - } - } - return res; + return (int) Math.signum(useToWeightRatio1 - useToWeightRatio2); } }