YARN-6307. Refactor FairShareComparator#compare (Contributed by Yufei Gu via Daniel Templeton)

This commit is contained in:
Daniel Templeton 2017-07-25 13:00:31 -07:00
parent ac9489f7fc
commit f81a4efb8c
1 changed files with 81 additions and 44 deletions

View File

@ -56,24 +56,28 @@ public class FairSharePolicy extends SchedulingPolicy {
} }
/** /**
* Compare Schedulables via weighted fair sharing. In addition, Schedulables * Compare Schedulables mainly via fair share usage to meet fairness.
* below their min share get priority over those whose min share is met. * Specifically, it goes through following four steps.
* *
* Schedulables without resource demand get lower priority than * 1. Compare demands. Schedulables without resource demand get lower priority
* ones who have demands. * than ones who have demands.
* *
* Schedulables below their min share are compared by how far below it they * 2. Compare min share usage. Schedulables below their min share are compared
* are as a ratio. For example, if job A has 8 out of a min share of 10 tasks * by how far below it they are as a ratio. For example, if job A has 8 out
* and job B has 50 out of a min share of 100, then job B is scheduled next, * of a min share of 10 tasks and job B has 50 out of a min share of 100,
* because B is at 50% of its min share and A is at 80% of its min share. * then job B is scheduled next, because B is at 50% of its min share and A
* is at 80% of its min share.
* *
* Schedulables above their min share are compared by (runningTasks / weight). * 3. Compare fair share usage. Schedulables above their min share are
* compared by fair share usage by checking (resource usage / weight).
* If all weights are equal, slots are given to the job with the fewest tasks; * If all weights are equal, slots are given to the job with the fewest tasks;
* otherwise, jobs with more weight get proportionally more slots. If weight * otherwise, jobs with more weight get proportionally more slots. If weight
* equals to 0, we can't compare Schedulables by (resource usage/weight). * equals to 0, we can't compare Schedulables by (resource usage/weight).
* There are two situations: 1)All weights equal to 0, slots are given * There are two situations: 1)All weights equal to 0, slots are given
* to one with less resource usage. 2)Only one of weight equals to 0, slots * to one with less resource usage. 2)Only one of weight equals to 0, slots
* are given to the one with non-zero weight. * are given to the one with non-zero weight.
*
* 4. Break the tie by compare submit time and job name.
*/ */
private static class FairShareComparator implements Comparator<Schedulable>, private static class FairShareComparator implements Comparator<Schedulable>,
Serializable { Serializable {
@ -82,37 +86,88 @@ public class FairSharePolicy extends SchedulingPolicy {
@Override @Override
public int compare(Schedulable s1, Schedulable s2) { public int compare(Schedulable s1, Schedulable s2) {
int res = compareDemand(s1, s2);
// Pre-compute resource usages to avoid duplicate calculation
Resource resourceUsage1 = s1.getResourceUsage();
Resource resourceUsage2 = s2.getResourceUsage();
if (res == 0) {
res = compareMinShareUsage(s1, s2, resourceUsage1, resourceUsage2);
}
if (res == 0) {
res = compareFairShareUsage(s1, s2, resourceUsage1, resourceUsage2);
}
// Break the tie by submit time
if (res == 0) {
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
}
// Break the tie by job name
if (res == 0) {
res = s1.getName().compareTo(s2.getName());
}
return res;
}
private int compareDemand(Schedulable s1, Schedulable s2) {
int res = 0;
Resource demand1 = s1.getDemand(); Resource demand1 = s1.getDemand();
Resource demand2 = s2.getDemand(); Resource demand2 = s2.getDemand();
if (demand1.equals(Resources.none()) && Resources.greaterThan( if (demand1.equals(Resources.none()) && Resources.greaterThan(
RESOURCE_CALCULATOR, null, demand2, Resources.none())) { RESOURCE_CALCULATOR, null, demand2, Resources.none())) {
return 1; res = 1;
} else if (demand2.equals(Resources.none()) && Resources.greaterThan( } else if (demand2.equals(Resources.none()) && Resources.greaterThan(
RESOURCE_CALCULATOR, null, demand1, Resources.none())) { RESOURCE_CALCULATOR, null, demand1, Resources.none())) {
return -1; res = -1;
}
return res;
} }
double minShareRatio1, minShareRatio2; private int compareMinShareUsage(Schedulable s1, Schedulable s2,
double useToWeightRatio1, useToWeightRatio2; Resource resourceUsage1, Resource resourceUsage2) {
double weight1, weight2; int res;
//Do not repeat the getResourceUsage calculation
Resource resourceUsage1 = s1.getResourceUsage();
Resource resourceUsage2 = s2.getResourceUsage();
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null, Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
s1.getMinShare(), demand1); s1.getMinShare(), s1.getDemand());
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null, Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
s2.getMinShare(), demand2); s2.getMinShare(), s2.getDemand());
boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
resourceUsage1, minShare1); resourceUsage1, minShare1);
boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null, boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
resourceUsage2, minShare2); resourceUsage2, minShare2);
minShareRatio1 = (double) resourceUsage1.getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize();
minShareRatio2 = (double) resourceUsage2.getMemorySize()
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize();
weight1 = s1.getWeights().getWeight(ResourceType.MEMORY); if (s1Needy && !s2Needy) {
weight2 = s2.getWeights().getWeight(ResourceType.MEMORY); res = -1;
} else if (s2Needy && !s1Needy) {
res = 1;
} else if (s1Needy && s2Needy) {
double minShareRatio1 = (double) resourceUsage1.getMemorySize() /
Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE)
.getMemorySize();
double minShareRatio2 = (double) resourceUsage2.getMemorySize() /
Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE)
.getMemorySize();
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
} else {
res = 0;
}
return res;
}
/**
* To simplify computation, use weights instead of fair shares to calculate
* fair share usage.
*/
private int compareFairShareUsage(Schedulable s1, Schedulable s2,
Resource resourceUsage1, Resource resourceUsage2) {
double weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);
double weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);
double useToWeightRatio1;
double useToWeightRatio2;
if (weight1 > 0.0 && weight2 > 0.0) { if (weight1 > 0.0 && weight2 > 0.0) {
useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1; useToWeightRatio1 = resourceUsage1.getMemorySize() / weight1;
useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2; useToWeightRatio2 = resourceUsage2.getMemorySize() / weight2;
@ -130,25 +185,7 @@ public class FairSharePolicy extends SchedulingPolicy {
} }
} }
int res = 0; return (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (s1Needy && !s2Needy)
res = -1;
else if (s2Needy && !s1Needy)
res = 1;
else if (s1Needy && s2Needy)
res = (int) Math.signum(minShareRatio1 - minShareRatio2);
else
// Neither schedulable is needy
res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
if (res == 0) {
// Apps are tied in fairness ratio. Break the tie by submit time and job
// name to get a deterministic ordering, which is useful for unit tests.
res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
if (res == 0) {
res = s1.getName().compareTo(s2.getName());
}
}
return res;
} }
} }