YARN-4743. FairSharePolicy breaks TimSort assumption. (Zephyr Guo and Yufei Gu via kasha)
(cherry picked from commit 4df8ed63ed
)
This commit is contained in:
parent
334fd9e83f
commit
950bfed1d3
|
@ -63,7 +63,11 @@ public class FairSharePolicy extends SchedulingPolicy {
|
||||||
*
|
*
|
||||||
* Schedulables above their min share are compared by (runningTasks / weight).
|
* Schedulables above their min share are compared by (runningTasks / weight).
|
||||||
* If all weights are equal, slots are given to the job with the fewest tasks;
|
* If all weights are equal, slots are given to the job with the fewest tasks;
|
||||||
* otherwise, jobs with more weight get proportionally more slots.
|
* otherwise, jobs with more weight get proportionally more slots. If weight
|
||||||
|
* equals to 0, we can't compare Schedulables by (resource usage/weight).
|
||||||
|
* There are two situations: 1)All weights equal to 0, slots are given
|
||||||
|
* to one with less resource usage. 2)Only one of weight equals to 0, slots
|
||||||
|
* are given to the one with non-zero weight.
|
||||||
*/
|
*/
|
||||||
private static class FairShareComparator implements Comparator<Schedulable>,
|
private static class FairShareComparator implements Comparator<Schedulable>,
|
||||||
Serializable {
|
Serializable {
|
||||||
|
@ -74,6 +78,7 @@ public class FairSharePolicy extends SchedulingPolicy {
|
||||||
public int compare(Schedulable s1, Schedulable s2) {
|
public int compare(Schedulable s1, Schedulable s2) {
|
||||||
double minShareRatio1, minShareRatio2;
|
double minShareRatio1, minShareRatio2;
|
||||||
double useToWeightRatio1, useToWeightRatio2;
|
double useToWeightRatio1, useToWeightRatio2;
|
||||||
|
double weight1, weight2;
|
||||||
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
|
Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
|
||||||
s1.getMinShare(), s1.getDemand());
|
s1.getMinShare(), s1.getDemand());
|
||||||
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
|
Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
|
||||||
|
@ -86,10 +91,26 @@ public class FairSharePolicy extends SchedulingPolicy {
|
||||||
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize();
|
/ Resources.max(RESOURCE_CALCULATOR, null, minShare1, ONE).getMemorySize();
|
||||||
minShareRatio2 = (double) s2.getResourceUsage().getMemorySize()
|
minShareRatio2 = (double) s2.getResourceUsage().getMemorySize()
|
||||||
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize();
|
/ Resources.max(RESOURCE_CALCULATOR, null, minShare2, ONE).getMemorySize();
|
||||||
useToWeightRatio1 = s1.getResourceUsage().getMemorySize() /
|
|
||||||
s1.getWeights().getWeight(ResourceType.MEMORY);
|
weight1 = s1.getWeights().getWeight(ResourceType.MEMORY);
|
||||||
useToWeightRatio2 = s2.getResourceUsage().getMemorySize() /
|
weight2 = s2.getWeights().getWeight(ResourceType.MEMORY);
|
||||||
s2.getWeights().getWeight(ResourceType.MEMORY);
|
if (weight1 > 0.0 && weight2 > 0.0) {
|
||||||
|
useToWeightRatio1 = s1.getResourceUsage().getMemorySize() / weight1;
|
||||||
|
useToWeightRatio2 = s2.getResourceUsage().getMemorySize() / weight2;
|
||||||
|
} else { // Either weight1 or weight2 equals to 0
|
||||||
|
if (weight1 == weight2) {
|
||||||
|
// If they have same weight, just compare usage
|
||||||
|
useToWeightRatio1 = s1.getResourceUsage().getMemorySize();
|
||||||
|
useToWeightRatio2 = s2.getResourceUsage().getMemorySize();
|
||||||
|
} else {
|
||||||
|
// By setting useToWeightRatios to negative weights, we give the
|
||||||
|
// zero-weight one less priority, so the non-zero weight one will
|
||||||
|
// be given slots.
|
||||||
|
useToWeightRatio1 = -weight1;
|
||||||
|
useToWeightRatio2 = -weight2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int res = 0;
|
int res = 0;
|
||||||
if (s1Needy && !s2Needy)
|
if (s1Needy && !s2Needy)
|
||||||
res = -1;
|
res = -1;
|
||||||
|
|
|
@ -21,13 +21,25 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Comparator;
|
||||||
|
import java.util.Stack;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Priority;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
public class TestSchedulingPolicy {
|
public class TestSchedulingPolicy {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestSchedulingPolicy.class);
|
||||||
|
|
||||||
@Test(timeout = 1000)
|
@Test(timeout = 1000)
|
||||||
public void testParseSchedulingPolicy()
|
public void testParseSchedulingPolicy()
|
||||||
|
@ -125,4 +137,220 @@ public class TestSchedulingPolicy {
|
||||||
assertFalse(ERR,
|
assertFalse(ERR,
|
||||||
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
|
SchedulingPolicy.isApplicableTo(policy, SchedulingPolicy.DEPTH_ANY));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test whether {@link FairSharePolicy.FairShareComparator} is transitive.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testFairShareComparatorTransitivity() {
|
||||||
|
FairSharePolicy policy = new FairSharePolicy();
|
||||||
|
Comparator<Schedulable> fairShareComparator = policy.getComparator();
|
||||||
|
FairShareComparatorTester tester =
|
||||||
|
new FairShareComparatorTester(fairShareComparator);
|
||||||
|
tester.testTransitivity();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is responsible for testing the transitivity of
|
||||||
|
* {@link FairSharePolicy.FairShareComparator}. We will generate
|
||||||
|
* a lot of triples(each triple contains three {@link Schedulable}),
|
||||||
|
* and then we verify transitivity by using each triple.
|
||||||
|
*
|
||||||
|
* <p>How to generate:</p>
|
||||||
|
* For each field in {@link Schedulable} we all have a data collection. We
|
||||||
|
* combine these data to construct a {@link Schedulable}, and generate all
|
||||||
|
* cases of triple by DFS(depth first search algorithm). We can get 100% code
|
||||||
|
* coverage by DFS.
|
||||||
|
*/
|
||||||
|
private class FairShareComparatorTester {
|
||||||
|
private Comparator<Schedulable> fairShareComparator;
|
||||||
|
|
||||||
|
// Use the following data collections to generate three Schedulable.
|
||||||
|
private Resource minShare = Resource.newInstance(0, 1);
|
||||||
|
|
||||||
|
private Resource demand = Resource.newInstance(4, 1);
|
||||||
|
|
||||||
|
private String[] nameCollection = {"A", "B", "C"};
|
||||||
|
|
||||||
|
private long[] startTimeColloection = {1L, 2L, 3L};
|
||||||
|
|
||||||
|
private Resource[] usageCollection = {
|
||||||
|
Resource.newInstance(0, 1), Resource.newInstance(2, 1),
|
||||||
|
Resource.newInstance(4, 1) };
|
||||||
|
|
||||||
|
private ResourceWeights[] weightsCollection = {
|
||||||
|
new ResourceWeights(0.0f), new ResourceWeights(1.0f),
|
||||||
|
new ResourceWeights(2.0f) };
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
public FairShareComparatorTester(
|
||||||
|
Comparator<Schedulable> fairShareComparator) {
|
||||||
|
this.fairShareComparator = fairShareComparator;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTransitivity() {
|
||||||
|
generateAndTest(new Stack<Schedulable>());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void generateAndTest(Stack<Schedulable> genSchedulable) {
|
||||||
|
if (genSchedulable.size() == 3) {
|
||||||
|
// We get three Schedulable objects, let's use them to check the
|
||||||
|
// comparator.
|
||||||
|
Assert.assertTrue("The comparator must ensure transitivity",
|
||||||
|
checkTransitivity(genSchedulable));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < nameCollection.length; i++) {
|
||||||
|
for (int j = 0; j < startTimeColloection.length; j++) {
|
||||||
|
for (int k = 0; k < usageCollection.length; k++) {
|
||||||
|
for (int t = 0; t < weightsCollection.length; t++) {
|
||||||
|
genSchedulable.push(createSchedulable(i, j, k, t));
|
||||||
|
generateAndTest(genSchedulable);
|
||||||
|
genSchedulable.pop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private Schedulable createSchedulable(
|
||||||
|
int nameIdx, int startTimeIdx, int usageIdx, int weightsIdx) {
|
||||||
|
return new MockSchedulable(minShare, demand, nameCollection[nameIdx],
|
||||||
|
startTimeColloection[startTimeIdx], usageCollection[usageIdx],
|
||||||
|
weightsCollection[weightsIdx]);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean checkTransitivity(
|
||||||
|
Collection<Schedulable> schedulableObjs) {
|
||||||
|
|
||||||
|
Assert.assertEquals(3, schedulableObjs.size());
|
||||||
|
Schedulable[] copy = schedulableObjs.toArray(new Schedulable[3]);
|
||||||
|
|
||||||
|
if (fairShareComparator.compare(copy[0], copy[1]) > 0) {
|
||||||
|
swap(copy, 0, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fairShareComparator.compare(copy[1], copy[2]) > 0) {
|
||||||
|
swap(copy, 1, 2);
|
||||||
|
|
||||||
|
if (fairShareComparator.compare(copy[0], copy[1]) > 0) {
|
||||||
|
swap(copy, 0, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Here, we have got the following condition:
|
||||||
|
// copy[0] <= copy[1] && copy[1] <= copy[2]
|
||||||
|
//
|
||||||
|
// So, just check copy[0] <= copy[2]
|
||||||
|
if (fairShareComparator.compare(copy[0], copy[2]) <= 0) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
LOG.fatal("Failure data: " + copy[0] + " " + copy[1] + " " + copy[2]);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void swap(Schedulable[] array, int x, int y) {
|
||||||
|
Schedulable tmp = array[x];
|
||||||
|
array[x] = array[y];
|
||||||
|
array[y] = tmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private class MockSchedulable implements Schedulable {
|
||||||
|
private Resource minShare;
|
||||||
|
private Resource demand;
|
||||||
|
private String name;
|
||||||
|
private long startTime;
|
||||||
|
private Resource usage;
|
||||||
|
private ResourceWeights weights;
|
||||||
|
|
||||||
|
public MockSchedulable(Resource minShare, Resource demand, String name,
|
||||||
|
long startTime, Resource usage, ResourceWeights weights) {
|
||||||
|
this.minShare = minShare;
|
||||||
|
this.demand = demand;
|
||||||
|
this.name = name;
|
||||||
|
this.startTime = startTime;
|
||||||
|
this.usage = usage;
|
||||||
|
this.weights = weights;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource getDemand() {
|
||||||
|
return demand;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource getResourceUsage() {
|
||||||
|
return usage;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource getMinShare() {
|
||||||
|
return minShare;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ResourceWeights getWeights() {
|
||||||
|
return weights;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getStartTime() {
|
||||||
|
return startTime;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource getMaxShare() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Priority getPriority() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void updateDemand() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource assignContainer(FSSchedulerNode node) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RMContainer preemptContainer() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Resource getFairShare() {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setFairShare(Resource fairShare) {
|
||||||
|
throw new UnsupportedOperationException();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "{name:" + name + ", start:" + startTime + ", usage:" + usage +
|
||||||
|
", weights:" + weights + ", demand:" + demand +
|
||||||
|
", minShare:" + minShare + "}";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue