diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java index 6aa8405ff43..f120f0fc374 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/policies/FairSharePolicy.java @@ -63,7 +63,11 @@ public class FairSharePolicy extends SchedulingPolicy { * * Schedulables above their min share are compared by (runningTasks / weight). * If all weights are equal, slots are given to the job with the fewest tasks; - * otherwise, jobs with more weight get proportionally more slots. + * otherwise, jobs with more weight get proportionally more slots. If weight + * equals to 0, we can't compare Schedulables by (resource usage/weight). + * There are two situations: 1)All weights equal to 0, slots are given + * to one with less resource usage. 2)Only one of weight equals to 0, slots + * are given to the one with non-zero weight. */ private static class FairShareComparator implements Comparator, Serializable { @@ -74,6 +78,7 @@ public class FairSharePolicy extends SchedulingPolicy { public int compare(Schedulable s1, Schedulable s2) { double minShareRatio1, minShareRatio2; double useToWeightRatio1, useToWeightRatio2; + double weight1, weight2; Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null, s1.getMinShare(), s1.getDemand()); Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null, @@ -86,10 +91,26 @@ public class FairSharePolicy extends SchedulingPolicy { / Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize(); minShareRatio2 = (double) s2.getResourceUsage().getMemorySize() / Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize(); - useToWeightRatio1 = s1.getResourceUsage().getMemorySize() / - s1.getWeights().getWeight(ResourceType.MEMORY); - useToWeightRatio2 = s2.getResourceUsage().getMemorySize() / - s2.getWeights().getWeight(ResourceType.MEMORY); + + weight1 = s1.getWeights().getWeight(ResourceType.MEMORY); + weight2 = s2.getWeights().getWeight(ResourceType.MEMORY); + if (weight1 > 0.0 && weight2 > 0.0) { + useToWeightRatio1 = s1.getResourceUsage().getMemorySize() / weight1; + useToWeightRatio2 = s2.getResourceUsage().getMemorySize() / weight2; + } else { // Either weight1 or weight2 equals to 0 + if (weight1 == weight2) { + // If they have same weight, just compare usage + useToWeightRatio1 = s1.getResourceUsage().getMemorySize(); + useToWeightRatio2 = s2.getResourceUsage().getMemorySize(); + } else { + // By setting useToWeightRatios to negative weights, we give the + // zero-weight one less priority, so the non-zero weight one will + // be given slots. + useToWeightRatio1 = -weight1; + useToWeightRatio2 = -weight2; + } + } + int res = 0; if (s1Needy && !s2Needy) res = -1; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java index eeedb09fc46..dea2dd1173c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestSchedulingPolicy.java @@ -21,13 +21,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.Collection; +import java.util.Comparator; +import java.util.Stack; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights; +import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; public class TestSchedulingPolicy { + private static final Log LOG = LogFactory.getLog(TestSchedulingPolicy.class); @Test(timeout = 1000) public void testParseSchedulingPolicy() @@ -125,4 +137,220 @@ public class TestSchedulingPolicy { assertFalse(ERR, SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY)); } + + /** + * Test whether {@link FairSharePolicy.FairShareComparator} is transitive. + */ + @Test + public void testFairShareComparatorTransitivity() { + FairSharePolicy policy = new FairSharePolicy(); + Comparator fairShareComparator = policy.getComparator(); + FairShareComparatorTester tester = + new FairShareComparatorTester(fairShareComparator); + tester.testTransitivity(); + } + + + /** + * This class is responsible for testing the transitivity of + * {@link FairSharePolicy.FairShareComparator}. We will generate + * a lot of triples(each triple contains three {@link Schedulable}), + * and then we verify transitivity by using each triple. + * + *

How to generate:

+ * For each field in {@link Schedulable} we all have a data collection. We + * combine these data to construct a {@link Schedulable}, and generate all + * cases of triple by DFS(depth first search algorithm). We can get 100% code + * coverage by DFS. + */ + private class FairShareComparatorTester { + private Comparator fairShareComparator; + + // Use the following data collections to generate three Schedulable. + private Resource minShare = Resource.newInstance(0, 1); + + private Resource demand = Resource.newInstance(4, 1); + + private String[] nameCollection = {"A", "B", "C"}; + + private long[] startTimeColloection = {1L, 2L, 3L}; + + private Resource[] usageCollection = { + Resource.newInstance(0, 1), Resource.newInstance(2, 1), + Resource.newInstance(4, 1) }; + + private ResourceWeights[] weightsCollection = { + new ResourceWeights(0.0f), new ResourceWeights(1.0f), + new ResourceWeights(2.0f) }; + + + + public FairShareComparatorTester( + Comparator fairShareComparator) { + this.fairShareComparator = fairShareComparator; + } + + public void testTransitivity() { + generateAndTest(new Stack()); + } + + private void generateAndTest(Stack genSchedulable) { + if (genSchedulable.size() == 3) { + // We get three Schedulable objects, let's use them to check the + // comparator. + Assert.assertTrue("The comparator must ensure transitivity", + checkTransitivity(genSchedulable)); + return; + } + + for (int i = 0; i < nameCollection.length; i++) { + for (int j = 0; j < startTimeColloection.length; j++) { + for (int k = 0; k < usageCollection.length; k++) { + for (int t = 0; t < weightsCollection.length; t++) { + genSchedulable.push(createSchedulable(i, j, k, t)); + generateAndTest(genSchedulable); + genSchedulable.pop(); + } + } + } + } + + } + + private Schedulable createSchedulable( + int nameIdx, int startTimeIdx, int usageIdx, int weightsIdx) { + return new MockSchedulable(minShare, demand, nameCollection[nameIdx], + startTimeColloection[startTimeIdx], usageCollection[usageIdx], + weightsCollection[weightsIdx]); + } + + private boolean checkTransitivity( + Collection schedulableObjs) { + + Assert.assertEquals(3, schedulableObjs.size()); + Schedulable[] copy = schedulableObjs.toArray(new Schedulable[3]); + + if (fairShareComparator.compare(copy[0], copy[1]) > 0) { + swap(copy, 0, 1); + } + + if (fairShareComparator.compare(copy[1], copy[2]) > 0) { + swap(copy, 1, 2); + + if (fairShareComparator.compare(copy[0], copy[1]) > 0) { + swap(copy, 0, 1); + } + } + + // Here, we have got the following condition: + // copy[0] <= copy[1] && copy[1] <= copy[2] + // + // So, just check copy[0] <= copy[2] + if (fairShareComparator.compare(copy[0], copy[2]) <= 0) { + return true; + } else { + LOG.fatal("Failure data: " + copy[0] + " " + copy[1] + " " + copy[2]); + return false; + } + } + + private void swap(Schedulable[] array, int x, int y) { + Schedulable tmp = array[x]; + array[x] = array[y]; + array[y] = tmp; + } + + + private class MockSchedulable implements Schedulable { + private Resource minShare; + private Resource demand; + private String name; + private long startTime; + private Resource usage; + private ResourceWeights weights; + + public MockSchedulable(Resource minShare, Resource demand, String name, + long startTime, Resource usage, ResourceWeights weights) { + this.minShare = minShare; + this.demand = demand; + this.name = name; + this.startTime = startTime; + this.usage = usage; + this.weights = weights; + } + + @Override + public String getName() { + return name; + } + + @Override + public Resource getDemand() { + return demand; + } + + @Override + public Resource getResourceUsage() { + return usage; + } + + @Override + public Resource getMinShare() { + return minShare; + } + + @Override + public ResourceWeights getWeights() { + return weights; + } + + @Override + public long getStartTime() { + return startTime; + } + + @Override + public Resource getMaxShare() { + throw new UnsupportedOperationException(); + } + + @Override + public Priority getPriority() { + throw new UnsupportedOperationException(); + } + + @Override + public void updateDemand() { + throw new UnsupportedOperationException(); + } + + @Override + public Resource assignContainer(FSSchedulerNode node) { + throw new UnsupportedOperationException(); + } + + @Override + public RMContainer preemptContainer() { + throw new UnsupportedOperationException(); + } + + @Override + public Resource getFairShare() { + throw new UnsupportedOperationException(); + } + + @Override + public void setFairShare(Resource fairShare) { + throw new UnsupportedOperationException(); + } + + @Override + public String toString() { + return "{name:" + name + ", start:" + startTime + ", usage:" + usage + + ", weights:" + weights + ", demand:" + demand + + ", minShare:" + minShare + "}"; + } + } + } + }