YARN-9173. FairShare calculation broken for large values after YARN-8833. Contributed by Wilfred Spiegelenburg.
(cherry picked from commit 944cf87223
)
This commit is contained in:
parent
cffe5c1ba0
commit
2b549e32e1
|
@ -24,6 +24,8 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
||||
|
||||
import static java.lang.Math.addExact;
|
||||
|
||||
/**
|
||||
* Contains logic for computing the fair shares. A {@link Schedulable}'s fair
|
||||
* share is {@link Resource} it is entitled to, independent of the current
|
||||
|
@ -31,10 +33,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.Schedulable;
|
|||
* consumption lies at or below its fair share will never have its containers
|
||||
* preempted.
|
||||
*/
|
||||
public class ComputeFairShares {
|
||||
public final class ComputeFairShares {
|
||||
|
||||
private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
|
||||
|
||||
private ComputeFairShares() {
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute fair share of the given schedulables.Fair share is an allocation of
|
||||
* shares considering only active schedulables ie schedulables which have
|
||||
|
@ -100,19 +105,20 @@ public class ComputeFairShares {
|
|||
* all Schedulables are only given their minShare) and an upper bound computed
|
||||
* to be large enough that too many slots are given (by doubling R until we
|
||||
* use more than totalResources resources). The helper method
|
||||
* resourceUsedWithWeightToResourceRatio computes the total resources used with a
|
||||
* given value of R.
|
||||
* resourceUsedWithWeightToResourceRatio computes the total resources used
|
||||
* with a given value of R.
|
||||
* <p>
|
||||
* The running time of this algorithm is linear in the number of Schedulables,
|
||||
* because resourceUsedWithWeightToResourceRatio is linear-time and the number of
|
||||
* iterations of binary search is a constant (dependent on desired precision).
|
||||
* because resourceUsedWithWeightToResourceRatio is linear-time and the
|
||||
* number of iterations of binary search is a constant (dependent on desired
|
||||
* precision).
|
||||
*/
|
||||
private static void computeSharesInternal(
|
||||
Collection<? extends Schedulable> allSchedulables,
|
||||
Resource totalResources, String type, boolean isSteadyShare) {
|
||||
|
||||
Collection<Schedulable> schedulables = new ArrayList<>();
|
||||
int takenResources = handleFixedFairShares(
|
||||
long takenResources = handleFixedFairShares(
|
||||
allSchedulables, schedulables, isSteadyShare, type);
|
||||
|
||||
if (schedulables.isEmpty()) {
|
||||
|
@ -121,12 +127,11 @@ public class ComputeFairShares {
|
|||
// Find an upper bound on R that we can use in our binary search. We start
|
||||
// at R = 1 and double it until we have either used all the resources or we
|
||||
// have met all Schedulables' max shares.
|
||||
int totalMaxShare = 0;
|
||||
long totalMaxShare = 0;
|
||||
for (Schedulable sched : schedulables) {
|
||||
long maxShare = sched.getMaxShare().getResourceValue(type);
|
||||
totalMaxShare = (int) Math.min(maxShare + (long)totalMaxShare,
|
||||
Integer.MAX_VALUE);
|
||||
if (totalMaxShare == Integer.MAX_VALUE) {
|
||||
totalMaxShare = safeAdd(maxShare, totalMaxShare);
|
||||
if (totalMaxShare == Long.MAX_VALUE) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -166,23 +171,24 @@ public class ComputeFairShares {
|
|||
target = sched.getFairShare();
|
||||
}
|
||||
|
||||
target.setResourceValue(type, (long)computeShare(sched, right, type));
|
||||
target.setResourceValue(type, computeShare(sched, right, type));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Compute the resources that would be used given a weight-to-resource ratio
|
||||
* w2rRatio, for use in the computeFairShares algorithm as described in #
|
||||
* w2rRatio, for use in the computeFairShares algorithm as described in
|
||||
* {@link #computeSharesInternal}.
|
||||
*/
|
||||
private static long resourceUsedWithWeightToResourceRatio(double w2rRatio,
|
||||
Collection<? extends Schedulable> schedulables, String type) {
|
||||
long resourcesTaken = 0;
|
||||
for (Schedulable sched : schedulables) {
|
||||
long share = computeShare(sched, w2rRatio, type);
|
||||
if (Long.MAX_VALUE - resourcesTaken < share) {
|
||||
return Long.MAX_VALUE;
|
||||
resourcesTaken = safeAdd(resourcesTaken, share);
|
||||
if (resourcesTaken == Long.MAX_VALUE) {
|
||||
break;
|
||||
}
|
||||
resourcesTaken += share;
|
||||
}
|
||||
return resourcesTaken;
|
||||
}
|
||||
|
@ -204,11 +210,11 @@ public class ComputeFairShares {
|
|||
* Returns the resources taken by fixed fairshare schedulables,
|
||||
* and adds the remaining to the passed nonFixedSchedulables.
|
||||
*/
|
||||
private static int handleFixedFairShares(
|
||||
private static long handleFixedFairShares(
|
||||
Collection<? extends Schedulable> schedulables,
|
||||
Collection<Schedulable> nonFixedSchedulables,
|
||||
boolean isSteadyShare, String type) {
|
||||
int totalResource = 0;
|
||||
long totalResource = 0;
|
||||
|
||||
for (Schedulable sched : schedulables) {
|
||||
long fixedShare = getFairShareIfFixed(sched, isSteadyShare, type);
|
||||
|
@ -224,15 +230,15 @@ public class ComputeFairShares {
|
|||
}
|
||||
|
||||
target.setResourceValue(type, fixedShare);
|
||||
totalResource = (int) Math.min((long)totalResource + (long)fixedShare,
|
||||
Integer.MAX_VALUE);
|
||||
totalResource = safeAdd(totalResource, fixedShare);
|
||||
}
|
||||
}
|
||||
return totalResource;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the fairshare for the {@link Schedulable} if it is fixed, -1 otherwise.
|
||||
* Get the fairshare for the {@link Schedulable} if it is fixed,
|
||||
* -1 otherwise.
|
||||
*
|
||||
* The fairshare is fixed if either the maxShare is 0, weight is 0,
|
||||
* or the Schedulable is not active for instantaneous fairshare.
|
||||
|
@ -259,4 +265,20 @@ public class ComputeFairShares {
|
|||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Safely add two long values. The result will always be a valid long value.
|
||||
* If the addition caused an overflow the return value will be set to
|
||||
* <code>Long.MAX_VALUE</code>.
|
||||
* @param a first long to add
|
||||
* @param b second long to add
|
||||
* @return result of the addition
|
||||
*/
|
||||
private static long safeAdd(long a, long b) {
|
||||
try {
|
||||
return addExact(a, b);
|
||||
} catch (ArithmeticException ae) {
|
||||
return Long.MAX_VALUE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -70,7 +70,18 @@ public class FakeSchedulable implements Schedulable {
|
|||
weights, Resources.createResource(0, 0),
|
||||
Resources.createResource(0, 0), 0);
|
||||
}
|
||||
|
||||
|
||||
public FakeSchedulable(long minShare, long maxShare) {
|
||||
this(minShare, maxShare, 1L);
|
||||
}
|
||||
|
||||
public FakeSchedulable(long minShare, long maxShare, float weights) {
|
||||
this(Resources.createResource(minShare, 0),
|
||||
Resources.createResource(maxShare, 0),
|
||||
weights, Resources.createResource(0, 0),
|
||||
Resources.createResource(0, 0), 0);
|
||||
}
|
||||
|
||||
public FakeSchedulable(Resource minShare, Resource maxShare,
|
||||
float weight, Resource fairShare, Resource usage, long startTime) {
|
||||
this.minShare = minShare;
|
||||
|
@ -146,8 +157,8 @@ public class FakeSchedulable implements Schedulable {
|
|||
return true;
|
||||
}
|
||||
|
||||
public void setResourceUsage(Resource usage) {
|
||||
this.usage = usage;
|
||||
public void setResourceUsage(Resource resourceUsage) {
|
||||
this.usage = resourceUsage;
|
||||
}
|
||||
|
||||
public final void start(long time) {
|
||||
|
|
|
@ -19,10 +19,8 @@
|
|||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceInformation;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
@ -40,7 +38,7 @@ public class TestComputeFairShares {
|
|||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
scheds = new ArrayList<Schedulable>();
|
||||
scheds = new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -149,22 +147,72 @@ public class TestComputeFairShares {
|
|||
}
|
||||
|
||||
/**
|
||||
* Test that shares are computed accurately even when the number of slots is
|
||||
* very large.
|
||||
* Test that shares are computed accurately even when the number of
|
||||
* resources is very large.
|
||||
* Test adapted to accommodate long values for resources.
|
||||
*/
|
||||
@Test
|
||||
public void testLargeShares() {
|
||||
int million = 1000 * 1000;
|
||||
scheds.add(new FakeSchedulable());
|
||||
scheds.add(new FakeSchedulable());
|
||||
scheds.add(new FakeSchedulable());
|
||||
scheds.add(new FakeSchedulable());
|
||||
long giga = 1000L * 1000L * 1000L * 4L;
|
||||
scheds.add(new FakeSchedulable(0L, giga));
|
||||
scheds.add(new FakeSchedulable(0L, giga));
|
||||
scheds.add(new FakeSchedulable(0L, giga));
|
||||
scheds.add(new FakeSchedulable(0L, giga));
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(40 * million),
|
||||
Resources.createResource(4 * giga),
|
||||
ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(10 * million, 10 * million, 10 * million, 10 * million);
|
||||
verifyMemoryShares(giga, giga, giga, giga);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test overflow in the resources taken and upper bound.
|
||||
*/
|
||||
@Test
|
||||
public void testLargeMinimums() {
|
||||
long giga = 1000L * 1000L * 1000L * 4L;
|
||||
scheds.add(new FakeSchedulable(Long.MAX_VALUE, Long.MAX_VALUE));
|
||||
scheds.add(new FakeSchedulable(giga, giga));
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(4 * giga),
|
||||
ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(Long.MAX_VALUE, giga);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test overflow in the upper bound calculation for the binary search.
|
||||
*/
|
||||
@Test
|
||||
public void testOverflowMaxShare() {
|
||||
long giga = 1000L * 1000L * 1000L;
|
||||
scheds.add(new FakeSchedulable(0L, giga));
|
||||
scheds.add(new FakeSchedulable(0L, Long.MAX_VALUE));
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(2 * giga),
|
||||
ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(giga, giga);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test overflow in the fixed share calculations. The 3th schedulable should
|
||||
* not get any share as all resources are taken by the handleFixedShare()
|
||||
* call.
|
||||
* With the overflow it looked like there were more resources available then
|
||||
* there really are.
|
||||
* The values in the test might not be "real" but they show the overflow.
|
||||
*/
|
||||
@Test
|
||||
public void testOverflowFixedShare() {
|
||||
long giga = 1000L * 1000L * 1000L;
|
||||
long minValue = Long.MAX_VALUE - 1L;
|
||||
scheds.add(new FakeSchedulable(giga, giga, 0));
|
||||
scheds.add(new FakeSchedulable(minValue, Long.MAX_VALUE, 0));
|
||||
scheds.add(new FakeSchedulable(0L, giga));
|
||||
ComputeFairShares.computeShares(scheds,
|
||||
Resources.createResource(1000L),
|
||||
ResourceInformation.MEMORY_MB.getName());
|
||||
verifyMemoryShares(giga, minValue, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that being called on an empty list doesn't confuse the algorithm.
|
||||
*/
|
||||
|
@ -176,7 +224,7 @@ public class TestComputeFairShares {
|
|||
}
|
||||
|
||||
/**
|
||||
* Test that CPU works as well as memory
|
||||
* Test that CPU works as well as memory.
|
||||
*/
|
||||
@Test
|
||||
public void testCPU() {
|
||||
|
@ -192,10 +240,12 @@ public class TestComputeFairShares {
|
|||
/**
|
||||
* Check that a given list of shares have been assigned to this.scheds.
|
||||
*/
|
||||
private void verifyMemoryShares(int... shares) {
|
||||
Assert.assertEquals(scheds.size(), shares.length);
|
||||
private void verifyMemoryShares(long... shares) {
|
||||
Assert.assertEquals("Number of shares and schedulables are not consistent",
|
||||
scheds.size(), shares.length);
|
||||
for (int i = 0; i < shares.length; i++) {
|
||||
Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getMemorySize());
|
||||
Assert.assertEquals("Expected share number " + i + " in list wrong",
|
||||
shares[i], scheds.get(i).getFairShare().getMemorySize());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -203,23 +253,11 @@ public class TestComputeFairShares {
|
|||
* Check that a given list of shares have been assigned to this.scheds.
|
||||
*/
|
||||
private void verifyCPUShares(int... shares) {
|
||||
Assert.assertEquals(scheds.size(), shares.length);
|
||||
Assert.assertEquals("Number of shares and schedulables are not consistent",
|
||||
scheds.size(), shares.length);
|
||||
for (int i = 0; i < shares.length; i++) {
|
||||
Assert.assertEquals(shares[i], scheds.get(i).getFairShare().getVirtualCores());
|
||||
Assert.assertEquals("Expected share number " + i + " in list wrong",
|
||||
shares[i], scheds.get(i).getFairShare().getVirtualCores());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test computeShares will not enter into infinite loop.
|
||||
*/
|
||||
@Test(timeout = 10000)
|
||||
public void testResourceUsedWithWeightToResourceRatio() {
|
||||
Collection<Schedulable> schedulables = new ArrayList<>();
|
||||
schedulables.add(new FakeSchedulable(Integer.MAX_VALUE));
|
||||
schedulables.add(new FakeSchedulable(Integer.MAX_VALUE));
|
||||
|
||||
Resource totalResource = Resource.newInstance(Integer.MAX_VALUE, 0);
|
||||
ComputeFairShares.computeShares(
|
||||
schedulables, totalResource, ResourceInformation.MEMORY_URI);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue